1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *	  Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2016, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/parallel.h"
18 #include "access/xact.h"
19 #include "access/xlog.h"
20 #include "catalog/namespace.h"
21 #include "commands/async.h"
22 #include "executor/execParallel.h"
23 #include "libpq/libpq.h"
24 #include "libpq/pqformat.h"
25 #include "libpq/pqmq.h"
26 #include "miscadmin.h"
27 #include "optimizer/planmain.h"
28 #include "storage/ipc.h"
29 #include "storage/sinval.h"
30 #include "storage/spin.h"
31 #include "tcop/tcopprot.h"
32 #include "utils/combocid.h"
33 #include "utils/guc.h"
34 #include "utils/inval.h"
35 #include "utils/memutils.h"
36 #include "utils/resowner.h"
37 #include "utils/snapmgr.h"
38 
39 
40 /*
41  * We don't want to waste a lot of memory on an error queue which, most of
42  * the time, will process only a handful of small messages.  However, it is
43  * desirable to make it large enough that a typical ErrorResponse can be sent
44  * without blocking.  That way, a worker that errors out can write the whole
45  * message into the queue and terminate without waiting for the user backend.
46  */
47 #define PARALLEL_ERROR_QUEUE_SIZE			16384
48 
49 /* Magic number for parallel context TOC. */
50 #define PARALLEL_MAGIC						0x50477c7c
51 
52 /*
53  * Magic numbers for parallel state sharing.  Higher-level code should use
54  * smaller values, leaving these very large ones for use by this module.
55  */
56 #define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
57 #define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
58 #define PARALLEL_KEY_LIBRARY				UINT64CONST(0xFFFFFFFFFFFF0003)
59 #define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0004)
60 #define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0005)
61 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0006)
62 #define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0007)
63 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
64 #define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
65 
66 /* Fixed-size parallel state. */
67 typedef struct FixedParallelState
68 {
69 	/* Fixed-size state that workers must restore. */
70 	Oid			database_id;
71 	Oid			authenticated_user_id;
72 	Oid			current_user_id;
73 	Oid			outer_user_id;
74 	Oid			temp_namespace_id;
75 	Oid			temp_toast_namespace_id;
76 	int			sec_context;
77 	bool		is_superuser;
78 	PGPROC	   *parallel_master_pgproc;
79 	pid_t		parallel_master_pid;
80 	BackendId	parallel_master_backend_id;
81 	TimestampTz xact_ts;
82 	TimestampTz stmt_ts;
83 
84 	/* Entrypoint for parallel workers (deprecated)! */
85 	parallel_worker_main_type entrypoint;
86 
87 	/* Mutex protects remaining fields. */
88 	slock_t		mutex;
89 
90 	/* Maximum XactLastRecEnd of any worker. */
91 	XLogRecPtr	last_xlog_end;
92 } FixedParallelState;
93 
94 /*
95  * Our parallel worker number.  We initialize this to -1, meaning that we are
96  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
97  * and < the number of workers before any user code is invoked; each parallel
98  * worker will get a different parallel worker number.
99  */
100 int			ParallelWorkerNumber = -1;
101 
102 /* Is there a parallel message pending which we need to receive? */
103 volatile bool ParallelMessagePending = false;
104 
105 /* Are we initializing a parallel worker? */
106 bool		InitializingParallelWorker = false;
107 
108 /* Pointer to our fixed parallel state. */
109 static FixedParallelState *MyFixedParallelState;
110 
111 /* List of active parallel contexts. */
112 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
113 
114 /* Backend-local copy of data from FixedParallelState. */
115 static pid_t ParallelMasterPid;
116 
117 /*
118  * List of internal parallel worker entry points.  We need this for
119  * reasons explained in LookupParallelWorkerFunction(), below.
120  */
121 static const struct
122 {
123 	const char *fn_name;
124 	parallel_worker_main_type fn_addr;
125 }	InternalParallelWorkers[] =
126 
127 {
128 	{
129 		"ParallelQueryMain", ParallelQueryMain
130 	}
131 };
132 
133 /* Private functions. */
134 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
135 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
136 static parallel_worker_main_type LookupParallelWorkerFunction(char *libraryname, char *funcname);
137 static void ParallelWorkerShutdown(int code, Datum arg);
138 
139 
140 /*
141  * Establish a new parallel context.  This should be done after entering
142  * parallel mode, and (unless there is an error) the context should be
143  * destroyed before exiting the current subtransaction.
144  *
145  * NB: specifying the entrypoint as a function address is unportable.
146  * This will go away in Postgres 10, in favor of the API provided by
147  * CreateParallelContextForExternalFunction; in the meantime use that.
148  */
149 ParallelContext *
CreateParallelContext(parallel_worker_main_type entrypoint,int nworkers)150 CreateParallelContext(parallel_worker_main_type entrypoint, int nworkers)
151 {
152 	MemoryContext oldcontext;
153 	ParallelContext *pcxt;
154 
155 	/* It is unsafe to create a parallel context if not in parallel mode. */
156 	Assert(IsInParallelMode());
157 
158 	/* Number of workers should be non-negative. */
159 	Assert(nworkers >= 0);
160 
161 	/*
162 	 * If dynamic shared memory is not available, we won't be able to use
163 	 * background workers.
164 	 */
165 	if (dynamic_shared_memory_type == DSM_IMPL_NONE)
166 		nworkers = 0;
167 
168 	/*
169 	 * If we are running under serializable isolation, we can't use parallel
170 	 * workers, at least not until somebody enhances that mechanism to be
171 	 * parallel-aware.
172 	 */
173 	if (IsolationIsSerializable())
174 		nworkers = 0;
175 
176 	/* We might be running in a short-lived memory context. */
177 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
178 
179 	/* Initialize a new ParallelContext. */
180 	pcxt = palloc0(sizeof(ParallelContext));
181 	pcxt->subid = GetCurrentSubTransactionId();
182 	pcxt->nworkers = nworkers;
183 	pcxt->entrypoint = entrypoint;
184 	pcxt->error_context_stack = error_context_stack;
185 	shm_toc_initialize_estimator(&pcxt->estimator);
186 	dlist_push_head(&pcxt_list, &pcxt->node);
187 
188 	/* Restore previous memory context. */
189 	MemoryContextSwitchTo(oldcontext);
190 
191 	return pcxt;
192 }
193 
194 /*
195  * Establish a new parallel context that calls a function specified by name.
196  * Unlike CreateParallelContext, this is robust against possible differences
197  * in address space layout between different processes.
198  */
199 ParallelContext *
CreateParallelContextForExternalFunction(char * library_name,char * function_name,int nworkers)200 CreateParallelContextForExternalFunction(char *library_name,
201 										 char *function_name,
202 										 int nworkers)
203 {
204 	MemoryContext oldcontext;
205 	ParallelContext *pcxt;
206 
207 	/* We might be running in a very short-lived memory context. */
208 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
209 
210 	/* Create the context. */
211 	pcxt = CreateParallelContext(NULL, nworkers);
212 	pcxt->library_name = pstrdup(library_name);
213 	pcxt->function_name = pstrdup(function_name);
214 
215 	/* Restore previous memory context. */
216 	MemoryContextSwitchTo(oldcontext);
217 
218 	return pcxt;
219 }
220 
221 /*
222  * Establish the dynamic shared memory segment for a parallel context and
223  * copy state and other bookkeeping information that will be needed by
224  * parallel workers into it.
225  */
226 void
InitializeParallelDSM(ParallelContext * pcxt)227 InitializeParallelDSM(ParallelContext *pcxt)
228 {
229 	MemoryContext oldcontext;
230 	Size		library_len = 0;
231 	Size		guc_len = 0;
232 	Size		combocidlen = 0;
233 	Size		tsnaplen = 0;
234 	Size		asnaplen = 0;
235 	Size		tstatelen = 0;
236 	Size		segsize = 0;
237 	int			i;
238 	FixedParallelState *fps;
239 	Snapshot	transaction_snapshot = GetTransactionSnapshot();
240 	Snapshot	active_snapshot = GetActiveSnapshot();
241 
242 	/* We might be running in a very short-lived memory context. */
243 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
244 
245 	/* Allow space to store the fixed-size parallel state. */
246 	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
247 	shm_toc_estimate_keys(&pcxt->estimator, 1);
248 
249 	/*
250 	 * Normally, the user will have requested at least one worker process, but
251 	 * if by chance they have not, we can skip a bunch of things here.
252 	 */
253 	if (pcxt->nworkers > 0)
254 	{
255 		/* Estimate space for various kinds of state sharing. */
256 		library_len = EstimateLibraryStateSpace();
257 		shm_toc_estimate_chunk(&pcxt->estimator, library_len);
258 		guc_len = EstimateGUCStateSpace();
259 		shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
260 		combocidlen = EstimateComboCIDStateSpace();
261 		shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
262 		tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
263 		shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
264 		asnaplen = EstimateSnapshotSpace(active_snapshot);
265 		shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
266 		tstatelen = EstimateTransactionStateSpace();
267 		shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
268 		/* If you add more chunks here, you probably need to add keys. */
269 		shm_toc_estimate_keys(&pcxt->estimator, 6);
270 
271 		/* Estimate space need for error queues. */
272 		StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
273 						 PARALLEL_ERROR_QUEUE_SIZE,
274 						 "parallel error queue size not buffer-aligned");
275 		shm_toc_estimate_chunk(&pcxt->estimator,
276 							   mul_size(PARALLEL_ERROR_QUEUE_SIZE,
277 										pcxt->nworkers));
278 		shm_toc_estimate_keys(&pcxt->estimator, 1);
279 
280 		/* Estimate how much we'll need for entrypoint info. */
281 		if (pcxt->library_name != NULL)
282 		{
283 			Assert(pcxt->function_name != NULL);
284 			shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name)
285 								   + strlen(pcxt->function_name) + 2);
286 			shm_toc_estimate_keys(&pcxt->estimator, 1);
287 		}
288 	}
289 
290 	/*
291 	 * Create DSM and initialize with new table of contents.  But if the user
292 	 * didn't request any workers, then don't bother creating a dynamic shared
293 	 * memory segment; instead, just use backend-private memory.
294 	 *
295 	 * Also, if we can't create a dynamic shared memory segment because the
296 	 * maximum number of segments have already been created, then fall back to
297 	 * backend-private memory, and plan not to use any workers.  We hope this
298 	 * won't happen very often, but it's better to abandon the use of
299 	 * parallelism than to fail outright.
300 	 */
301 	segsize = shm_toc_estimate(&pcxt->estimator);
302 	if (pcxt->nworkers > 0)
303 		pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
304 	if (pcxt->seg != NULL)
305 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
306 								   dsm_segment_address(pcxt->seg),
307 								   segsize);
308 	else
309 	{
310 		pcxt->nworkers = 0;
311 		pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
312 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
313 								   segsize);
314 	}
315 
316 	/* Initialize fixed-size state in shared memory. */
317 	fps = (FixedParallelState *)
318 		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
319 	fps->database_id = MyDatabaseId;
320 	fps->authenticated_user_id = GetAuthenticatedUserId();
321 	fps->outer_user_id = GetCurrentRoleId();
322 	fps->is_superuser = session_auth_is_superuser;
323 	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
324 	GetTempNamespaceState(&fps->temp_namespace_id,
325 						  &fps->temp_toast_namespace_id);
326 	fps->parallel_master_pgproc = MyProc;
327 	fps->parallel_master_pid = MyProcPid;
328 	fps->parallel_master_backend_id = MyBackendId;
329 	fps->xact_ts = GetCurrentTransactionStartTimestamp();
330 	fps->stmt_ts = GetCurrentStatementStartTimestamp();
331 	fps->entrypoint = pcxt->entrypoint;
332 	SpinLockInit(&fps->mutex);
333 	fps->last_xlog_end = 0;
334 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
335 
336 	/* We can skip the rest of this if we're not budgeting for any workers. */
337 	if (pcxt->nworkers > 0)
338 	{
339 		char	   *libraryspace;
340 		char	   *gucspace;
341 		char	   *combocidspace;
342 		char	   *tsnapspace;
343 		char	   *asnapspace;
344 		char	   *tstatespace;
345 		char	   *error_queue_space;
346 
347 		/* Serialize shared libraries we have loaded. */
348 		libraryspace = shm_toc_allocate(pcxt->toc, library_len);
349 		SerializeLibraryState(library_len, libraryspace);
350 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
351 
352 		/* Serialize GUC settings. */
353 		gucspace = shm_toc_allocate(pcxt->toc, guc_len);
354 		SerializeGUCState(guc_len, gucspace);
355 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
356 
357 		/* Serialize combo CID state. */
358 		combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
359 		SerializeComboCIDState(combocidlen, combocidspace);
360 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
361 
362 		/* Serialize transaction snapshot and active snapshot. */
363 		tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
364 		SerializeSnapshot(transaction_snapshot, tsnapspace);
365 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
366 					   tsnapspace);
367 		asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
368 		SerializeSnapshot(active_snapshot, asnapspace);
369 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
370 
371 		/* Serialize transaction state. */
372 		tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
373 		SerializeTransactionState(tstatelen, tstatespace);
374 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
375 
376 		/* Allocate space for worker information. */
377 		pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
378 
379 		/*
380 		 * Establish error queues in dynamic shared memory.
381 		 *
382 		 * These queues should be used only for transmitting ErrorResponse,
383 		 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
384 		 * should be transmitted via separate (possibly larger?) queues.
385 		 */
386 		error_queue_space =
387 			shm_toc_allocate(pcxt->toc,
388 							 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
389 									  pcxt->nworkers));
390 		for (i = 0; i < pcxt->nworkers; ++i)
391 		{
392 			char	   *start;
393 			shm_mq	   *mq;
394 
395 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
396 			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
397 			shm_mq_set_receiver(mq, MyProc);
398 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
399 		}
400 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
401 
402 		/* Serialize entrypoint information. */
403 		if (pcxt->library_name != NULL)
404 		{
405 			Size		lnamelen = strlen(pcxt->library_name);
406 			char	   *extensionstate;
407 
408 			extensionstate = shm_toc_allocate(pcxt->toc, lnamelen
409 										  + strlen(pcxt->function_name) + 2);
410 			strcpy(extensionstate, pcxt->library_name);
411 			strcpy(extensionstate + lnamelen + 1, pcxt->function_name);
412 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT,
413 						   extensionstate);
414 		}
415 	}
416 
417 	/* Restore previous memory context. */
418 	MemoryContextSwitchTo(oldcontext);
419 }
420 
421 /*
422  * Reinitialize the dynamic shared memory segment for a parallel context such
423  * that we could launch workers for it again.
424  */
425 void
ReinitializeParallelDSM(ParallelContext * pcxt)426 ReinitializeParallelDSM(ParallelContext *pcxt)
427 {
428 	FixedParallelState *fps;
429 	char	   *error_queue_space;
430 	int			i;
431 
432 	/* Wait for any old workers to exit. */
433 	if (pcxt->nworkers_launched > 0)
434 	{
435 		WaitForParallelWorkersToFinish(pcxt);
436 		WaitForParallelWorkersToExit(pcxt);
437 		pcxt->nworkers_launched = 0;
438 		if (pcxt->any_message_received)
439 		{
440 			pfree(pcxt->any_message_received);
441 			pcxt->any_message_received = NULL;
442 		}
443 	}
444 
445 	/* Reset a few bits of fixed parallel state to a clean state. */
446 	fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
447 	fps->last_xlog_end = 0;
448 
449 	/* Recreate error queues. */
450 	error_queue_space =
451 		shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE);
452 	for (i = 0; i < pcxt->nworkers; ++i)
453 	{
454 		char	   *start;
455 		shm_mq	   *mq;
456 
457 		start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
458 		mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
459 		shm_mq_set_receiver(mq, MyProc);
460 		pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
461 	}
462 }
463 
464 /*
465  * Launch parallel workers.
466  */
467 void
LaunchParallelWorkers(ParallelContext * pcxt)468 LaunchParallelWorkers(ParallelContext *pcxt)
469 {
470 	MemoryContext oldcontext;
471 	BackgroundWorker worker;
472 	int			i;
473 	bool		any_registrations_failed = false;
474 
475 	/* Skip this if we have no workers. */
476 	if (pcxt->nworkers == 0)
477 		return;
478 
479 	/* We need to be a lock group leader. */
480 	BecomeLockGroupLeader();
481 
482 	/* If we do have workers, we'd better have a DSM segment. */
483 	Assert(pcxt->seg != NULL);
484 
485 	/* We might be running in a short-lived memory context. */
486 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
487 
488 	/* Configure a worker. */
489 	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
490 			 MyProcPid);
491 	worker.bgw_flags =
492 		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION;
493 	worker.bgw_start_time = BgWorkerStart_ConsistentState;
494 	worker.bgw_restart_time = BGW_NEVER_RESTART;
495 	worker.bgw_main = NULL;
496 	sprintf(worker.bgw_library_name, "postgres");
497 	sprintf(worker.bgw_function_name, "ParallelWorkerMain");
498 	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
499 	worker.bgw_notify_pid = MyProcPid;
500 	memset(&worker.bgw_extra, 0, BGW_EXTRALEN);
501 
502 	/*
503 	 * Start workers.
504 	 *
505 	 * The caller must be able to tolerate ending up with fewer workers than
506 	 * expected, so there is no need to throw an error here if registration
507 	 * fails.  It wouldn't help much anyway, because registering the worker in
508 	 * no way guarantees that it will start up and initialize successfully.
509 	 */
510 	for (i = 0; i < pcxt->nworkers; ++i)
511 	{
512 		memcpy(worker.bgw_extra, &i, sizeof(int));
513 		if (!any_registrations_failed &&
514 			RegisterDynamicBackgroundWorker(&worker,
515 											&pcxt->worker[i].bgwhandle))
516 		{
517 			shm_mq_set_handle(pcxt->worker[i].error_mqh,
518 							  pcxt->worker[i].bgwhandle);
519 			pcxt->nworkers_launched++;
520 		}
521 		else
522 		{
523 			/*
524 			 * If we weren't able to register the worker, then we've bumped up
525 			 * against the max_worker_processes limit, and future
526 			 * registrations will probably fail too, so arrange to skip them.
527 			 * But we still have to execute this code for the remaining slots
528 			 * to make sure that we forget about the error queues we budgeted
529 			 * for those workers.  Otherwise, we'll wait for them to start,
530 			 * but they never will.
531 			 */
532 			any_registrations_failed = true;
533 			pcxt->worker[i].bgwhandle = NULL;
534 			pfree(pcxt->worker[i].error_mqh);
535 			pcxt->worker[i].error_mqh = NULL;
536 		}
537 	}
538 
539 	/*
540 	 * Now that nworkers_launched has taken its final value, we can initialize
541 	 * any_message_received.
542 	 */
543 	if (pcxt->nworkers_launched > 0)
544 		pcxt->any_message_received =
545 			palloc0(sizeof(bool) * pcxt->nworkers_launched);
546 
547 	/* Restore previous memory context. */
548 	MemoryContextSwitchTo(oldcontext);
549 }
550 
551 /*
552  * Wait for all workers to finish computing.
553  *
554  * Even if the parallel operation seems to have completed successfully, it's
555  * important to call this function afterwards.  We must not miss any errors
556  * the workers may have thrown during the parallel operation, or any that they
557  * may yet throw while shutting down.
558  *
559  * Also, we want to update our notion of XactLastRecEnd based on worker
560  * feedback.
561  */
562 void
WaitForParallelWorkersToFinish(ParallelContext * pcxt)563 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
564 {
565 	for (;;)
566 	{
567 		bool		anyone_alive = false;
568 		int			nfinished = 0;
569 		int			i;
570 
571 		/*
572 		 * This will process any parallel messages that are pending, which may
573 		 * change the outcome of the loop that follows.  It may also throw an
574 		 * error propagated from a worker.
575 		 */
576 		CHECK_FOR_INTERRUPTS();
577 
578 		for (i = 0; i < pcxt->nworkers_launched; ++i)
579 		{
580 			/*
581 			 * If error_mqh is NULL, then the worker has already exited
582 			 * cleanly.  If we have received a message through error_mqh from
583 			 * the worker, we know it started up cleanly, and therefore we're
584 			 * certain to be notified when it exits.
585 			 */
586 			if (pcxt->worker[i].error_mqh == NULL)
587 				++nfinished;
588 			else if (pcxt->any_message_received[i])
589 			{
590 				anyone_alive = true;
591 				break;
592 			}
593 		}
594 
595 		if (!anyone_alive)
596 		{
597 			/* If all workers are known to have finished, we're done. */
598 			if (nfinished >= pcxt->nworkers_launched)
599 			{
600 				Assert(nfinished == pcxt->nworkers_launched);
601 				break;
602 			}
603 
604 			/*
605 			 * We didn't detect any living workers, but not all workers are
606 			 * known to have exited cleanly.  Either not all workers have
607 			 * launched yet, or maybe some of them failed to start or
608 			 * terminated abnormally.
609 			 */
610 			for (i = 0; i < pcxt->nworkers_launched; ++i)
611 			{
612 				pid_t		pid;
613 				shm_mq	   *mq;
614 
615 				/*
616 				 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
617 				 * should just keep waiting.  If it is BGWH_STOPPED, then
618 				 * further investigation is needed.
619 				 */
620 				if (pcxt->worker[i].error_mqh == NULL ||
621 					pcxt->worker[i].bgwhandle == NULL ||
622 					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
623 										   &pid) != BGWH_STOPPED)
624 					continue;
625 
626 				/*
627 				 * Check whether the worker ended up stopped without ever
628 				 * attaching to the error queue.  If so, the postmaster was
629 				 * unable to fork the worker or it exited without initializing
630 				 * properly.  We must throw an error, since the caller may
631 				 * have been expecting the worker to do some work before
632 				 * exiting.
633 				 */
634 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
635 				if (shm_mq_get_sender(mq) == NULL)
636 					ereport(ERROR,
637 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
638 							 errmsg("parallel worker failed to initialize"),
639 							 errhint("More details may be available in the server log.")));
640 
641 				/*
642 				 * The worker is stopped, but is attached to the error queue.
643 				 * Unless there's a bug somewhere, this will only happen when
644 				 * the worker writes messages and terminates after the
645 				 * CHECK_FOR_INTERRUPTS() near the top of this function and
646 				 * before the call to GetBackgroundWorkerPid().  In that case,
647 				 * or latch should have been set as well and the right things
648 				 * will happen on the next pass through the loop.
649 				 */
650 			}
651 		}
652 
653 		WaitLatch(&MyProc->procLatch, WL_LATCH_SET, -1);
654 		ResetLatch(&MyProc->procLatch);
655 	}
656 
657 	if (pcxt->toc != NULL)
658 	{
659 		FixedParallelState *fps;
660 
661 		fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED);
662 		if (fps->last_xlog_end > XactLastRecEnd)
663 			XactLastRecEnd = fps->last_xlog_end;
664 	}
665 }
666 
667 /*
668  * Wait for all workers to exit.
669  *
670  * This function ensures that workers have been completely shutdown.  The
671  * difference between WaitForParallelWorkersToFinish and this function is
672  * that former just ensures that last message sent by worker backend is
673  * received by master backend whereas this ensures the complete shutdown.
674  */
675 static void
WaitForParallelWorkersToExit(ParallelContext * pcxt)676 WaitForParallelWorkersToExit(ParallelContext *pcxt)
677 {
678 	int			i;
679 
680 	/* Wait until the workers actually die. */
681 	for (i = 0; i < pcxt->nworkers_launched; ++i)
682 	{
683 		BgwHandleStatus status;
684 
685 		if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
686 			continue;
687 
688 		status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
689 
690 		/*
691 		 * If the postmaster kicked the bucket, we have no chance of cleaning
692 		 * up safely -- we won't be able to tell when our workers are actually
693 		 * dead.  This doesn't necessitate a PANIC since they will all abort
694 		 * eventually, but we can't safely continue this session.
695 		 */
696 		if (status == BGWH_POSTMASTER_DIED)
697 			ereport(FATAL,
698 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
699 				 errmsg("postmaster exited during a parallel transaction")));
700 
701 		/* Release memory. */
702 		pfree(pcxt->worker[i].bgwhandle);
703 		pcxt->worker[i].bgwhandle = NULL;
704 	}
705 }
706 
707 /*
708  * Destroy a parallel context.
709  *
710  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
711  * first, before calling this function.  When this function is invoked, any
712  * remaining workers are forcibly killed; the dynamic shared memory segment
713  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
714  */
715 void
DestroyParallelContext(ParallelContext * pcxt)716 DestroyParallelContext(ParallelContext *pcxt)
717 {
718 	int			i;
719 
720 	/*
721 	 * Be careful about order of operations here!  We remove the parallel
722 	 * context from the list before we do anything else; otherwise, if an
723 	 * error occurs during a subsequent step, we might try to nuke it again
724 	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
725 	 */
726 	dlist_delete(&pcxt->node);
727 
728 	/* Kill each worker in turn, and forget their error queues. */
729 	if (pcxt->worker != NULL)
730 	{
731 		for (i = 0; i < pcxt->nworkers_launched; ++i)
732 		{
733 			if (pcxt->worker[i].error_mqh != NULL)
734 			{
735 				TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
736 
737 				pfree(pcxt->worker[i].error_mqh);
738 				pcxt->worker[i].error_mqh = NULL;
739 			}
740 		}
741 	}
742 
743 	/*
744 	 * If we have allocated a shared memory segment, detach it.  This will
745 	 * implicitly detach the error queues, and any other shared memory queues,
746 	 * stored there.
747 	 */
748 	if (pcxt->seg != NULL)
749 	{
750 		dsm_detach(pcxt->seg);
751 		pcxt->seg = NULL;
752 	}
753 
754 	/*
755 	 * If this parallel context is actually in backend-private memory rather
756 	 * than shared memory, free that memory instead.
757 	 */
758 	if (pcxt->private_memory != NULL)
759 	{
760 		pfree(pcxt->private_memory);
761 		pcxt->private_memory = NULL;
762 	}
763 
764 	/*
765 	 * We can't finish transaction commit or abort until all of the workers
766 	 * have exited.  This means, in particular, that we can't respond to
767 	 * interrupts at this stage.
768 	 */
769 	HOLD_INTERRUPTS();
770 	WaitForParallelWorkersToExit(pcxt);
771 	RESUME_INTERRUPTS();
772 
773 	/* Free the worker array itself. */
774 	if (pcxt->worker != NULL)
775 	{
776 		pfree(pcxt->worker);
777 		pcxt->worker = NULL;
778 	}
779 
780 	/* Free memory. */
781 	if (pcxt->library_name)
782 		pfree(pcxt->library_name);
783 	if (pcxt->function_name)
784 		pfree(pcxt->function_name);
785 	pfree(pcxt);
786 }
787 
788 /*
789  * Are there any parallel contexts currently active?
790  */
791 bool
ParallelContextActive(void)792 ParallelContextActive(void)
793 {
794 	return !dlist_is_empty(&pcxt_list);
795 }
796 
797 /*
798  * Handle receipt of an interrupt indicating a parallel worker message.
799  *
800  * Note: this is called within a signal handler!  All we can do is set
801  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
802  * HandleParallelMessages().
803  */
804 void
HandleParallelMessageInterrupt(void)805 HandleParallelMessageInterrupt(void)
806 {
807 	InterruptPending = true;
808 	ParallelMessagePending = true;
809 	SetLatch(MyLatch);
810 }
811 
812 /*
813  * Handle any queued protocol messages received from parallel workers.
814  */
815 void
HandleParallelMessages(void)816 HandleParallelMessages(void)
817 {
818 	dlist_iter	iter;
819 	MemoryContext oldcontext;
820 
821 	static MemoryContext hpm_context = NULL;
822 
823 	/*
824 	 * This is invoked from ProcessInterrupts(), and since some of the
825 	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
826 	 * for recursive calls if more signals are received while this runs.  It's
827 	 * unclear that recursive entry would be safe, and it doesn't seem useful
828 	 * even if it is safe, so let's block interrupts until done.
829 	 */
830 	HOLD_INTERRUPTS();
831 
832 	/*
833 	 * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
834 	 * don't want to risk leaking data into long-lived contexts, so let's do
835 	 * our work here in a private context that we can reset on each use.
836 	 */
837 	if (hpm_context == NULL)	/* first time through? */
838 		hpm_context = AllocSetContextCreate(TopMemoryContext,
839 											"HandleParallelMessages",
840 											ALLOCSET_DEFAULT_SIZES);
841 	else
842 		MemoryContextReset(hpm_context);
843 
844 	oldcontext = MemoryContextSwitchTo(hpm_context);
845 
846 	/* OK to process messages.  Reset the flag saying there are more to do. */
847 	ParallelMessagePending = false;
848 
849 	dlist_foreach(iter, &pcxt_list)
850 	{
851 		ParallelContext *pcxt;
852 		int			i;
853 
854 		pcxt = dlist_container(ParallelContext, node, iter.cur);
855 		if (pcxt->worker == NULL)
856 			continue;
857 
858 		for (i = 0; i < pcxt->nworkers_launched; ++i)
859 		{
860 			/*
861 			 * Read as many messages as we can from each worker, but stop when
862 			 * either (1) the worker's error queue goes away, which can happen
863 			 * if we receive a Terminate message from the worker; or (2) no
864 			 * more messages can be read from the worker without blocking.
865 			 */
866 			while (pcxt->worker[i].error_mqh != NULL)
867 			{
868 				shm_mq_result res;
869 				Size		nbytes;
870 				void	   *data;
871 
872 				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
873 									 &data, true);
874 				if (res == SHM_MQ_WOULD_BLOCK)
875 					break;
876 				else if (res == SHM_MQ_SUCCESS)
877 				{
878 					StringInfoData msg;
879 
880 					initStringInfo(&msg);
881 					appendBinaryStringInfo(&msg, data, nbytes);
882 					HandleParallelMessage(pcxt, i, &msg);
883 					pfree(msg.data);
884 				}
885 				else
886 					ereport(ERROR,
887 						  (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
888 						   errmsg("lost connection to parallel worker")));
889 			}
890 		}
891 	}
892 
893 	MemoryContextSwitchTo(oldcontext);
894 
895 	/* Might as well clear the context on our way out */
896 	MemoryContextReset(hpm_context);
897 
898 	RESUME_INTERRUPTS();
899 }
900 
901 /*
902  * Handle a single protocol message received from a single parallel worker.
903  */
904 static void
HandleParallelMessage(ParallelContext * pcxt,int i,StringInfo msg)905 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
906 {
907 	char		msgtype;
908 
909 	if (pcxt->any_message_received != NULL)
910 		pcxt->any_message_received[i] = true;
911 
912 	msgtype = pq_getmsgbyte(msg);
913 
914 	switch (msgtype)
915 	{
916 		case 'K':				/* BackendKeyData */
917 			{
918 				int32		pid = pq_getmsgint(msg, 4);
919 
920 				(void) pq_getmsgint(msg, 4);	/* discard cancel key */
921 				(void) pq_getmsgend(msg);
922 				pcxt->worker[i].pid = pid;
923 				break;
924 			}
925 
926 		case 'E':				/* ErrorResponse */
927 		case 'N':				/* NoticeResponse */
928 			{
929 				ErrorData	edata;
930 				ErrorContextCallback *save_error_context_stack;
931 
932 				/* Parse ErrorResponse or NoticeResponse. */
933 				pq_parse_errornotice(msg, &edata);
934 
935 				/* Death of a worker isn't enough justification for suicide. */
936 				edata.elevel = Min(edata.elevel, ERROR);
937 
938 				/*
939 				 * If desired, add a context line to show that this is a
940 				 * message propagated from a parallel worker.  Otherwise, it
941 				 * can sometimes be confusing to understand what actually
942 				 * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
943 				 * because it causes test-result instability depending on
944 				 * whether a parallel worker is actually used or not.)
945 				 */
946 				if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
947 				{
948 					if (edata.context)
949 						edata.context = psprintf("%s\n%s", edata.context,
950 												 _("parallel worker"));
951 					else
952 						edata.context = pstrdup(_("parallel worker"));
953 				}
954 
955 				/*
956 				 * Context beyond that should use the error context callbacks
957 				 * that were in effect when the ParallelContext was created,
958 				 * not the current ones.
959 				 */
960 				save_error_context_stack = error_context_stack;
961 				error_context_stack = pcxt->error_context_stack;
962 
963 				/* Rethrow error or print notice. */
964 				ThrowErrorData(&edata);
965 
966 				/* Not an error, so restore previous context stack. */
967 				error_context_stack = save_error_context_stack;
968 
969 				break;
970 			}
971 
972 		case 'A':				/* NotifyResponse */
973 			{
974 				/* Propagate NotifyResponse. */
975 				int32		pid;
976 				const char *channel;
977 				const char *payload;
978 
979 				pid = pq_getmsgint(msg, 4);
980 				channel = pq_getmsgrawstring(msg);
981 				payload = pq_getmsgrawstring(msg);
982 				pq_endmessage(msg);
983 
984 				NotifyMyFrontEnd(channel, payload, pid);
985 
986 				break;
987 			}
988 
989 		case 'X':				/* Terminate, indicating clean exit */
990 			{
991 				pfree(pcxt->worker[i].error_mqh);
992 				pcxt->worker[i].error_mqh = NULL;
993 				break;
994 			}
995 
996 		default:
997 			{
998 				elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
999 					 msgtype, msg->len);
1000 			}
1001 	}
1002 }
1003 
1004 /*
1005  * End-of-subtransaction cleanup for parallel contexts.
1006  *
1007  * Currently, it's forbidden to enter or leave a subtransaction while
1008  * parallel mode is in effect, so we could just blow away everything.  But
1009  * we may want to relax that restriction in the future, so this code
1010  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1011  */
1012 void
AtEOSubXact_Parallel(bool isCommit,SubTransactionId mySubId)1013 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1014 {
1015 	while (!dlist_is_empty(&pcxt_list))
1016 	{
1017 		ParallelContext *pcxt;
1018 
1019 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1020 		if (pcxt->subid != mySubId)
1021 			break;
1022 		if (isCommit)
1023 			elog(WARNING, "leaked parallel context");
1024 		DestroyParallelContext(pcxt);
1025 	}
1026 }
1027 
1028 /*
1029  * End-of-transaction cleanup for parallel contexts.
1030  */
1031 void
AtEOXact_Parallel(bool isCommit)1032 AtEOXact_Parallel(bool isCommit)
1033 {
1034 	while (!dlist_is_empty(&pcxt_list))
1035 	{
1036 		ParallelContext *pcxt;
1037 
1038 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1039 		if (isCommit)
1040 			elog(WARNING, "leaked parallel context");
1041 		DestroyParallelContext(pcxt);
1042 	}
1043 }
1044 
1045 /*
1046  * Main entrypoint for parallel workers.
1047  */
1048 void
ParallelWorkerMain(Datum main_arg)1049 ParallelWorkerMain(Datum main_arg)
1050 {
1051 	dsm_segment *seg;
1052 	shm_toc    *toc;
1053 	FixedParallelState *fps;
1054 	char	   *error_queue_space;
1055 	shm_mq	   *mq;
1056 	shm_mq_handle *mqh;
1057 	char	   *libraryspace;
1058 	char	   *entrypointstate;
1059 	parallel_worker_main_type entrypt;
1060 	char	   *gucspace;
1061 	char	   *combocidspace;
1062 	char	   *tsnapspace;
1063 	char	   *asnapspace;
1064 	char	   *tstatespace;
1065 	StringInfoData msgbuf;
1066 
1067 	/* Set flag to indicate that we're initializing a parallel worker. */
1068 	InitializingParallelWorker = true;
1069 
1070 	/* Establish signal handlers. */
1071 	pqsignal(SIGTERM, die);
1072 	BackgroundWorkerUnblockSignals();
1073 
1074 	/* Determine and set our parallel worker number. */
1075 	Assert(ParallelWorkerNumber == -1);
1076 	memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1077 
1078 	/* Set up a memory context and resource owner. */
1079 	Assert(CurrentResourceOwner == NULL);
1080 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
1081 	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1082 												 "Parallel worker",
1083 												 ALLOCSET_DEFAULT_SIZES);
1084 
1085 	/*
1086 	 * Now that we have a resource owner, we can attach to the dynamic shared
1087 	 * memory segment and read the table of contents.
1088 	 */
1089 	seg = dsm_attach(DatumGetUInt32(main_arg));
1090 	if (seg == NULL)
1091 		ereport(ERROR,
1092 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1093 				 errmsg("could not map dynamic shared memory segment")));
1094 	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1095 	if (toc == NULL)
1096 		ereport(ERROR,
1097 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1098 		   errmsg("invalid magic number in dynamic shared memory segment")));
1099 
1100 	/* Look up fixed parallel state. */
1101 	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED);
1102 	Assert(fps != NULL);
1103 	MyFixedParallelState = fps;
1104 
1105 	/* Arrange to signal the leader if we exit. */
1106 	ParallelMasterPid = fps->parallel_master_pid;
1107 	ParallelMasterBackendId = fps->parallel_master_backend_id;
1108 	on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1109 
1110 	/*
1111 	 * Now we can find and attach to the error queue provided for us.  That's
1112 	 * good, because until we do that, any errors that happen here will not be
1113 	 * reported back to the process that requested that this worker be
1114 	 * launched.
1115 	 */
1116 	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE);
1117 	mq = (shm_mq *) (error_queue_space +
1118 					 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1119 	shm_mq_set_sender(mq, MyProc);
1120 	mqh = shm_mq_attach(mq, seg, NULL);
1121 	pq_redirect_to_shm_mq(seg, mqh);
1122 	pq_set_parallel_master(fps->parallel_master_pid,
1123 						   fps->parallel_master_backend_id);
1124 
1125 	/*
1126 	 * Send a BackendKeyData message to the process that initiated parallelism
1127 	 * so that it has access to our PID before it receives any other messages
1128 	 * from us.  Our cancel key is sent, too, since that's the way the
1129 	 * protocol message is defined, but it won't actually be used for anything
1130 	 * in this case.
1131 	 */
1132 	pq_beginmessage(&msgbuf, 'K');
1133 	pq_sendint(&msgbuf, (int32) MyProcPid, sizeof(int32));
1134 	pq_sendint(&msgbuf, (int32) MyCancelKey, sizeof(int32));
1135 	pq_endmessage(&msgbuf);
1136 
1137 	/*
1138 	 * Hooray! Primary initialization is complete.  Now, we need to set up our
1139 	 * backend-local state to match the original backend.
1140 	 */
1141 
1142 	/*
1143 	 * Join locking group.  We must do this before anything that could try to
1144 	 * acquire a heavyweight lock, because any heavyweight locks acquired to
1145 	 * this point could block either directly against the parallel group
1146 	 * leader or against some process which in turn waits for a lock that
1147 	 * conflicts with the parallel group leader, causing an undetected
1148 	 * deadlock.  (If we can't join the lock group, the leader has gone away,
1149 	 * so just exit quietly.)
1150 	 */
1151 	if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1152 							   fps->parallel_master_pid))
1153 		return;
1154 
1155 	/*
1156 	 * Restore transaction and statement start-time timestamps.  This must
1157 	 * happen before anything that would start a transaction, else asserts in
1158 	 * xact.c will fire.
1159 	 */
1160 	SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1161 
1162 	/*
1163 	 * Identify the entry point to be called.  In theory this could result in
1164 	 * loading an additional library, though most likely the entry point is in
1165 	 * the core backend or in a library we just loaded.
1166 	 */
1167 	entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT);
1168 	if (entrypointstate != NULL)
1169 	{
1170 		char	   *library_name;
1171 		char	   *function_name;
1172 
1173 		library_name = entrypointstate;
1174 		function_name = entrypointstate + strlen(library_name) + 1;
1175 
1176 		entrypt = LookupParallelWorkerFunction(library_name, function_name);
1177 	}
1178 	else
1179 		entrypt = fps->entrypoint;
1180 
1181 	/* Restore database connection. */
1182 	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1183 											  fps->authenticated_user_id);
1184 
1185 	/*
1186 	 * Set the client encoding to the database encoding, since that is what
1187 	 * the leader will expect.
1188 	 */
1189 	SetClientEncoding(GetDatabaseEncoding());
1190 
1191 	/*
1192 	 * Load libraries that were loaded by original backend.  We want to do
1193 	 * this before restoring GUCs, because the libraries might define custom
1194 	 * variables.
1195 	 */
1196 	libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY);
1197 	Assert(libraryspace != NULL);
1198 	StartTransactionCommand();
1199 	RestoreLibraryState(libraryspace);
1200 
1201 	/* Restore GUC values from launching backend. */
1202 	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC);
1203 	Assert(gucspace != NULL);
1204 	RestoreGUCState(gucspace);
1205 	CommitTransactionCommand();
1206 
1207 	/* Crank up a transaction state appropriate to a parallel worker. */
1208 	tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE);
1209 	StartParallelWorkerTransaction(tstatespace);
1210 
1211 	/* Restore combo CID state. */
1212 	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID);
1213 	Assert(combocidspace != NULL);
1214 	RestoreComboCIDState(combocidspace);
1215 
1216 	/* Restore transaction snapshot. */
1217 	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT);
1218 	Assert(tsnapspace != NULL);
1219 	RestoreTransactionSnapshot(RestoreSnapshot(tsnapspace),
1220 							   fps->parallel_master_pgproc);
1221 
1222 	/* Restore active snapshot. */
1223 	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT);
1224 	Assert(asnapspace != NULL);
1225 	PushActiveSnapshot(RestoreSnapshot(asnapspace));
1226 
1227 	/*
1228 	 * We've changed which tuples we can see, and must therefore invalidate
1229 	 * system caches.
1230 	 */
1231 	InvalidateSystemCaches();
1232 
1233 	/*
1234 	 * Restore current role id.  Skip verifying whether session user is
1235 	 * allowed to become this role and blindly restore the leader's state for
1236 	 * current role.
1237 	 */
1238 	SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1239 
1240 	/* Restore user ID and security context. */
1241 	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1242 
1243 	/* Restore temp-namespace state to ensure search path matches leader's. */
1244 	SetTempNamespaceState(fps->temp_namespace_id,
1245 						  fps->temp_toast_namespace_id);
1246 
1247 	/*
1248 	 * We've initialized all of our state now; nothing should change
1249 	 * hereafter.
1250 	 */
1251 	InitializingParallelWorker = false;
1252 	EnterParallelMode();
1253 
1254 	/*
1255 	 * Time to do the real work: invoke the caller-supplied code.
1256 	 *
1257 	 * If you get a crash at this line, try using
1258 	 * CreateParallelContextForExternalFunction instead of
1259 	 * CreateParallelContext.
1260 	 */
1261 	entrypt(seg, toc);
1262 
1263 	/* Must exit parallel mode to pop active snapshot. */
1264 	ExitParallelMode();
1265 
1266 	/* Must pop active snapshot so resowner.c doesn't complain. */
1267 	PopActiveSnapshot();
1268 
1269 	/* Shut down the parallel-worker transaction. */
1270 	EndParallelWorkerTransaction();
1271 
1272 	/* Report success. */
1273 	pq_putmessage('X', NULL, 0);
1274 }
1275 
1276 /*
1277  * Update shared memory with the ending location of the last WAL record we
1278  * wrote, if it's greater than the value already stored there.
1279  */
1280 void
ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)1281 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1282 {
1283 	FixedParallelState *fps = MyFixedParallelState;
1284 
1285 	Assert(fps != NULL);
1286 	SpinLockAcquire(&fps->mutex);
1287 	if (fps->last_xlog_end < last_xlog_end)
1288 		fps->last_xlog_end = last_xlog_end;
1289 	SpinLockRelease(&fps->mutex);
1290 }
1291 
1292 /*
1293  * Make sure the leader tries to read from our error queue one more time.
1294  * This guards against the case where we exit uncleanly without sending an
1295  * ErrorResponse to the leader, for example because some code calls proc_exit
1296  * directly.
1297  */
1298 static void
ParallelWorkerShutdown(int code,Datum arg)1299 ParallelWorkerShutdown(int code, Datum arg)
1300 {
1301 	SendProcSignal(ParallelMasterPid,
1302 				   PROCSIG_PARALLEL_MESSAGE,
1303 				   ParallelMasterBackendId);
1304 }
1305 
1306 /*
1307  * Look up (and possibly load) a parallel worker entry point function.
1308  *
1309  * For functions contained in the core code, we use library name "postgres"
1310  * and consult the InternalParallelWorkers array.  External functions are
1311  * looked up, and loaded if necessary, using load_external_function().
1312  *
1313  * The point of this is to pass function names as strings across process
1314  * boundaries.  We can't pass actual function addresses because of the
1315  * possibility that the function has been loaded at a different address
1316  * in a different process.  This is obviously a hazard for functions in
1317  * loadable libraries, but it can happen even for functions in the core code
1318  * on platforms using EXEC_BACKEND (e.g., Windows).
1319  *
1320  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1321  * in favor of applying load_external_function() for core functions too;
1322  * but that raises portability issues that are not worth addressing now.
1323  */
1324 static parallel_worker_main_type
LookupParallelWorkerFunction(char * libraryname,char * funcname)1325 LookupParallelWorkerFunction(char *libraryname, char *funcname)
1326 {
1327 	/*
1328 	 * If the function is to be loaded from postgres itself, search the
1329 	 * InternalParallelWorkers array.
1330 	 */
1331 	if (strcmp(libraryname, "postgres") == 0)
1332 	{
1333 		int			i;
1334 
1335 		for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1336 		{
1337 			if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1338 				return InternalParallelWorkers[i].fn_addr;
1339 		}
1340 
1341 		/* We can only reach this by programming error. */
1342 		elog(ERROR, "internal function \"%s\" not found", funcname);
1343 	}
1344 
1345 	/* Otherwise load from external library. */
1346 	return (parallel_worker_main_type)
1347 		load_external_function(libraryname, funcname, true, NULL);
1348 }
1349