1 /*-------------------------------------------------------------------------
2  *
3  * parallel.c
4  *	  Infrastructure for launching parallel workers
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * IDENTIFICATION
10  *	  src/backend/access/transam/parallel.c
11  *
12  *-------------------------------------------------------------------------
13  */
14 
15 #include "postgres.h"
16 
17 #include "access/nbtree.h"
18 #include "access/parallel.h"
19 #include "access/session.h"
20 #include "access/xact.h"
21 #include "access/xlog.h"
22 #include "catalog/index.h"
23 #include "catalog/namespace.h"
24 #include "commands/async.h"
25 #include "executor/execParallel.h"
26 #include "libpq/libpq.h"
27 #include "libpq/pqformat.h"
28 #include "libpq/pqmq.h"
29 #include "miscadmin.h"
30 #include "optimizer/planmain.h"
31 #include "pgstat.h"
32 #include "storage/ipc.h"
33 #include "storage/sinval.h"
34 #include "storage/spin.h"
35 #include "tcop/tcopprot.h"
36 #include "utils/combocid.h"
37 #include "utils/guc.h"
38 #include "utils/inval.h"
39 #include "utils/memutils.h"
40 #include "utils/resowner.h"
41 #include "utils/snapmgr.h"
42 #include "utils/typcache.h"
43 
44 
45 /*
46  * We don't want to waste a lot of memory on an error queue which, most of
47  * the time, will process only a handful of small messages.  However, it is
48  * desirable to make it large enough that a typical ErrorResponse can be sent
49  * without blocking.  That way, a worker that errors out can write the whole
50  * message into the queue and terminate without waiting for the user backend.
51  */
52 #define PARALLEL_ERROR_QUEUE_SIZE			16384
53 
54 /* Magic number for parallel context TOC. */
55 #define PARALLEL_MAGIC						0x50477c7c
56 
57 /*
58  * Magic numbers for per-context parallel state sharing.  Higher-level code
59  * should use smaller values, leaving these very large ones for use by this
60  * module.
61  */
62 #define PARALLEL_KEY_FIXED					UINT64CONST(0xFFFFFFFFFFFF0001)
63 #define PARALLEL_KEY_ERROR_QUEUE			UINT64CONST(0xFFFFFFFFFFFF0002)
64 #define PARALLEL_KEY_LIBRARY				UINT64CONST(0xFFFFFFFFFFFF0003)
65 #define PARALLEL_KEY_GUC					UINT64CONST(0xFFFFFFFFFFFF0004)
66 #define PARALLEL_KEY_COMBO_CID				UINT64CONST(0xFFFFFFFFFFFF0005)
67 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT	UINT64CONST(0xFFFFFFFFFFFF0006)
68 #define PARALLEL_KEY_ACTIVE_SNAPSHOT		UINT64CONST(0xFFFFFFFFFFFF0007)
69 #define PARALLEL_KEY_TRANSACTION_STATE		UINT64CONST(0xFFFFFFFFFFFF0008)
70 #define PARALLEL_KEY_ENTRYPOINT				UINT64CONST(0xFFFFFFFFFFFF0009)
71 #define PARALLEL_KEY_SESSION_DSM			UINT64CONST(0xFFFFFFFFFFFF000A)
72 #define PARALLEL_KEY_REINDEX_STATE			UINT64CONST(0xFFFFFFFFFFFF000B)
73 
74 /* Fixed-size parallel state. */
75 typedef struct FixedParallelState
76 {
77 	/* Fixed-size state that workers must restore. */
78 	Oid			database_id;
79 	Oid			authenticated_user_id;
80 	Oid			current_user_id;
81 	Oid			outer_user_id;
82 	Oid			temp_namespace_id;
83 	Oid			temp_toast_namespace_id;
84 	int			sec_context;
85 	bool		is_superuser;
86 	PGPROC	   *parallel_master_pgproc;
87 	pid_t		parallel_master_pid;
88 	BackendId	parallel_master_backend_id;
89 	TimestampTz xact_ts;
90 	TimestampTz stmt_ts;
91 
92 	/* Mutex protects remaining fields. */
93 	slock_t		mutex;
94 
95 	/* Maximum XactLastRecEnd of any worker. */
96 	XLogRecPtr	last_xlog_end;
97 } FixedParallelState;
98 
99 /*
100  * Our parallel worker number.  We initialize this to -1, meaning that we are
101  * not a parallel worker.  In parallel workers, it will be set to a value >= 0
102  * and < the number of workers before any user code is invoked; each parallel
103  * worker will get a different parallel worker number.
104  */
105 int			ParallelWorkerNumber = -1;
106 
107 /* Is there a parallel message pending which we need to receive? */
108 volatile bool ParallelMessagePending = false;
109 
110 /* Are we initializing a parallel worker? */
111 bool		InitializingParallelWorker = false;
112 
113 /* Pointer to our fixed parallel state. */
114 static FixedParallelState *MyFixedParallelState;
115 
116 /* List of active parallel contexts. */
117 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list);
118 
119 /* Backend-local copy of data from FixedParallelState. */
120 static pid_t ParallelMasterPid;
121 
122 /*
123  * List of internal parallel worker entry points.  We need this for
124  * reasons explained in LookupParallelWorkerFunction(), below.
125  */
126 static const struct
127 {
128 	const char *fn_name;
129 	parallel_worker_main_type fn_addr;
130 }			InternalParallelWorkers[] =
131 
132 {
133 	{
134 		"ParallelQueryMain", ParallelQueryMain
135 	},
136 	{
137 		"_bt_parallel_build_main", _bt_parallel_build_main
138 	}
139 };
140 
141 /* Private functions. */
142 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg);
143 static void WaitForParallelWorkersToExit(ParallelContext *pcxt);
144 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname);
145 static void ParallelWorkerShutdown(int code, Datum arg);
146 
147 
148 /*
149  * Establish a new parallel context.  This should be done after entering
150  * parallel mode, and (unless there is an error) the context should be
151  * destroyed before exiting the current subtransaction.
152  */
153 ParallelContext *
CreateParallelContext(const char * library_name,const char * function_name,int nworkers,bool serializable_okay)154 CreateParallelContext(const char *library_name, const char *function_name,
155 					  int nworkers, bool serializable_okay)
156 {
157 	MemoryContext oldcontext;
158 	ParallelContext *pcxt;
159 
160 	/* It is unsafe to create a parallel context if not in parallel mode. */
161 	Assert(IsInParallelMode());
162 
163 	/* Number of workers should be non-negative. */
164 	Assert(nworkers >= 0);
165 
166 	/*
167 	 * If dynamic shared memory is not available, we won't be able to use
168 	 * background workers.
169 	 */
170 	if (dynamic_shared_memory_type == DSM_IMPL_NONE)
171 		nworkers = 0;
172 
173 	/*
174 	 * If we are running under serializable isolation, we can't use parallel
175 	 * workers, at least not until somebody enhances that mechanism to be
176 	 * parallel-aware.  Utility statement callers may ask us to ignore this
177 	 * restriction because they're always able to safely ignore the fact that
178 	 * SIREAD locks do not work with parallelism.
179 	 */
180 	if (IsolationIsSerializable() && !serializable_okay)
181 		nworkers = 0;
182 
183 	/* We might be running in a short-lived memory context. */
184 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
185 
186 	/* Initialize a new ParallelContext. */
187 	pcxt = palloc0(sizeof(ParallelContext));
188 	pcxt->subid = GetCurrentSubTransactionId();
189 	pcxt->nworkers = nworkers;
190 	pcxt->library_name = pstrdup(library_name);
191 	pcxt->function_name = pstrdup(function_name);
192 	pcxt->error_context_stack = error_context_stack;
193 	shm_toc_initialize_estimator(&pcxt->estimator);
194 	dlist_push_head(&pcxt_list, &pcxt->node);
195 
196 	/* Restore previous memory context. */
197 	MemoryContextSwitchTo(oldcontext);
198 
199 	return pcxt;
200 }
201 
202 /*
203  * Establish the dynamic shared memory segment for a parallel context and
204  * copy state and other bookkeeping information that will be needed by
205  * parallel workers into it.
206  */
207 void
InitializeParallelDSM(ParallelContext * pcxt)208 InitializeParallelDSM(ParallelContext *pcxt)
209 {
210 	MemoryContext oldcontext;
211 	Size		library_len = 0;
212 	Size		guc_len = 0;
213 	Size		combocidlen = 0;
214 	Size		tsnaplen = 0;
215 	Size		asnaplen = 0;
216 	Size		tstatelen = 0;
217 	Size		reindexlen = 0;
218 	Size		segsize = 0;
219 	int			i;
220 	FixedParallelState *fps;
221 	dsm_handle	session_dsm_handle = DSM_HANDLE_INVALID;
222 	Snapshot	transaction_snapshot = GetTransactionSnapshot();
223 	Snapshot	active_snapshot = GetActiveSnapshot();
224 
225 	/* We might be running in a very short-lived memory context. */
226 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
227 
228 	/* Allow space to store the fixed-size parallel state. */
229 	shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState));
230 	shm_toc_estimate_keys(&pcxt->estimator, 1);
231 
232 	/*
233 	 * Normally, the user will have requested at least one worker process, but
234 	 * if by chance they have not, we can skip a bunch of things here.
235 	 */
236 	if (pcxt->nworkers > 0)
237 	{
238 		/* Get (or create) the per-session DSM segment's handle. */
239 		session_dsm_handle = GetSessionDsmHandle();
240 
241 		/*
242 		 * If we weren't able to create a per-session DSM segment, then we can
243 		 * continue but we can't safely launch any workers because their
244 		 * record typmods would be incompatible so they couldn't exchange
245 		 * tuples.
246 		 */
247 		if (session_dsm_handle == DSM_HANDLE_INVALID)
248 			pcxt->nworkers = 0;
249 	}
250 
251 	if (pcxt->nworkers > 0)
252 	{
253 		/* Estimate space for various kinds of state sharing. */
254 		library_len = EstimateLibraryStateSpace();
255 		shm_toc_estimate_chunk(&pcxt->estimator, library_len);
256 		guc_len = EstimateGUCStateSpace();
257 		shm_toc_estimate_chunk(&pcxt->estimator, guc_len);
258 		combocidlen = EstimateComboCIDStateSpace();
259 		shm_toc_estimate_chunk(&pcxt->estimator, combocidlen);
260 		if (IsolationUsesXactSnapshot())
261 		{
262 			tsnaplen = EstimateSnapshotSpace(transaction_snapshot);
263 			shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen);
264 		}
265 		asnaplen = EstimateSnapshotSpace(active_snapshot);
266 		shm_toc_estimate_chunk(&pcxt->estimator, asnaplen);
267 		tstatelen = EstimateTransactionStateSpace();
268 		shm_toc_estimate_chunk(&pcxt->estimator, tstatelen);
269 		shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle));
270 		reindexlen = EstimateReindexStateSpace();
271 		shm_toc_estimate_chunk(&pcxt->estimator, reindexlen);
272 		/* If you add more chunks here, you probably need to add keys. */
273 		shm_toc_estimate_keys(&pcxt->estimator, 8);
274 
275 		/* Estimate space need for error queues. */
276 		StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) ==
277 						 PARALLEL_ERROR_QUEUE_SIZE,
278 						 "parallel error queue size not buffer-aligned");
279 		shm_toc_estimate_chunk(&pcxt->estimator,
280 							   mul_size(PARALLEL_ERROR_QUEUE_SIZE,
281 										pcxt->nworkers));
282 		shm_toc_estimate_keys(&pcxt->estimator, 1);
283 
284 		/* Estimate how much we'll need for the entrypoint info. */
285 		shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) +
286 							   strlen(pcxt->function_name) + 2);
287 		shm_toc_estimate_keys(&pcxt->estimator, 1);
288 	}
289 
290 	/*
291 	 * Create DSM and initialize with new table of contents.  But if the user
292 	 * didn't request any workers, then don't bother creating a dynamic shared
293 	 * memory segment; instead, just use backend-private memory.
294 	 *
295 	 * Also, if we can't create a dynamic shared memory segment because the
296 	 * maximum number of segments have already been created, then fall back to
297 	 * backend-private memory, and plan not to use any workers.  We hope this
298 	 * won't happen very often, but it's better to abandon the use of
299 	 * parallelism than to fail outright.
300 	 */
301 	segsize = shm_toc_estimate(&pcxt->estimator);
302 	if (pcxt->nworkers > 0)
303 		pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS);
304 	if (pcxt->seg != NULL)
305 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC,
306 								   dsm_segment_address(pcxt->seg),
307 								   segsize);
308 	else
309 	{
310 		pcxt->nworkers = 0;
311 		pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize);
312 		pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory,
313 								   segsize);
314 	}
315 
316 	/* Initialize fixed-size state in shared memory. */
317 	fps = (FixedParallelState *)
318 		shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState));
319 	fps->database_id = MyDatabaseId;
320 	fps->authenticated_user_id = GetAuthenticatedUserId();
321 	fps->outer_user_id = GetCurrentRoleId();
322 	fps->is_superuser = session_auth_is_superuser;
323 	GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context);
324 	GetTempNamespaceState(&fps->temp_namespace_id,
325 						  &fps->temp_toast_namespace_id);
326 	fps->parallel_master_pgproc = MyProc;
327 	fps->parallel_master_pid = MyProcPid;
328 	fps->parallel_master_backend_id = MyBackendId;
329 	fps->xact_ts = GetCurrentTransactionStartTimestamp();
330 	fps->stmt_ts = GetCurrentStatementStartTimestamp();
331 	SpinLockInit(&fps->mutex);
332 	fps->last_xlog_end = 0;
333 	shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps);
334 
335 	/* We can skip the rest of this if we're not budgeting for any workers. */
336 	if (pcxt->nworkers > 0)
337 	{
338 		char	   *libraryspace;
339 		char	   *gucspace;
340 		char	   *combocidspace;
341 		char	   *tsnapspace;
342 		char	   *asnapspace;
343 		char	   *tstatespace;
344 		char	   *reindexspace;
345 		char	   *error_queue_space;
346 		char	   *session_dsm_handle_space;
347 		char	   *entrypointstate;
348 		Size		lnamelen;
349 
350 		/* Serialize shared libraries we have loaded. */
351 		libraryspace = shm_toc_allocate(pcxt->toc, library_len);
352 		SerializeLibraryState(library_len, libraryspace);
353 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace);
354 
355 		/* Serialize GUC settings. */
356 		gucspace = shm_toc_allocate(pcxt->toc, guc_len);
357 		SerializeGUCState(guc_len, gucspace);
358 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace);
359 
360 		/* Serialize combo CID state. */
361 		combocidspace = shm_toc_allocate(pcxt->toc, combocidlen);
362 		SerializeComboCIDState(combocidlen, combocidspace);
363 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace);
364 
365 		/*
366 		 * Serialize the transaction snapshot if the transaction
367 		 * isolation-level uses a transaction snapshot.
368 		 */
369 		if (IsolationUsesXactSnapshot())
370 		{
371 			tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen);
372 			SerializeSnapshot(transaction_snapshot, tsnapspace);
373 			shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT,
374 						   tsnapspace);
375 		}
376 
377 		/* Serialize the active snapshot. */
378 		asnapspace = shm_toc_allocate(pcxt->toc, asnaplen);
379 		SerializeSnapshot(active_snapshot, asnapspace);
380 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace);
381 
382 		/* Provide the handle for per-session segment. */
383 		session_dsm_handle_space = shm_toc_allocate(pcxt->toc,
384 													sizeof(dsm_handle));
385 		*(dsm_handle *) session_dsm_handle_space = session_dsm_handle;
386 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM,
387 					   session_dsm_handle_space);
388 
389 		/* Serialize transaction state. */
390 		tstatespace = shm_toc_allocate(pcxt->toc, tstatelen);
391 		SerializeTransactionState(tstatelen, tstatespace);
392 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace);
393 
394 		/* Serialize reindex state. */
395 		reindexspace = shm_toc_allocate(pcxt->toc, reindexlen);
396 		SerializeReindexState(reindexlen, reindexspace);
397 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace);
398 
399 		/* Allocate space for worker information. */
400 		pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers);
401 
402 		/*
403 		 * Establish error queues in dynamic shared memory.
404 		 *
405 		 * These queues should be used only for transmitting ErrorResponse,
406 		 * NoticeResponse, and NotifyResponse protocol messages.  Tuple data
407 		 * should be transmitted via separate (possibly larger?) queues.
408 		 */
409 		error_queue_space =
410 			shm_toc_allocate(pcxt->toc,
411 							 mul_size(PARALLEL_ERROR_QUEUE_SIZE,
412 									  pcxt->nworkers));
413 		for (i = 0; i < pcxt->nworkers; ++i)
414 		{
415 			char	   *start;
416 			shm_mq	   *mq;
417 
418 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
419 			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
420 			shm_mq_set_receiver(mq, MyProc);
421 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
422 		}
423 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space);
424 
425 		/*
426 		 * Serialize entrypoint information.  It's unsafe to pass function
427 		 * pointers across processes, as the function pointer may be different
428 		 * in each process in EXEC_BACKEND builds, so we always pass library
429 		 * and function name.  (We use library name "postgres" for functions
430 		 * in the core backend.)
431 		 */
432 		lnamelen = strlen(pcxt->library_name);
433 		entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen +
434 										   strlen(pcxt->function_name) + 2);
435 		strcpy(entrypointstate, pcxt->library_name);
436 		strcpy(entrypointstate + lnamelen + 1, pcxt->function_name);
437 		shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate);
438 	}
439 
440 	/* Restore previous memory context. */
441 	MemoryContextSwitchTo(oldcontext);
442 }
443 
444 /*
445  * Reinitialize the dynamic shared memory segment for a parallel context such
446  * that we could launch workers for it again.
447  */
448 void
ReinitializeParallelDSM(ParallelContext * pcxt)449 ReinitializeParallelDSM(ParallelContext *pcxt)
450 {
451 	FixedParallelState *fps;
452 
453 	/* Wait for any old workers to exit. */
454 	if (pcxt->nworkers_launched > 0)
455 	{
456 		WaitForParallelWorkersToFinish(pcxt);
457 		WaitForParallelWorkersToExit(pcxt);
458 		pcxt->nworkers_launched = 0;
459 		if (pcxt->known_attached_workers)
460 		{
461 			pfree(pcxt->known_attached_workers);
462 			pcxt->known_attached_workers = NULL;
463 			pcxt->nknown_attached_workers = 0;
464 		}
465 	}
466 
467 	/* Reset a few bits of fixed parallel state to a clean state. */
468 	fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
469 	fps->last_xlog_end = 0;
470 
471 	/* Recreate error queues (if they exist). */
472 	if (pcxt->nworkers > 0)
473 	{
474 		char	   *error_queue_space;
475 		int			i;
476 
477 		error_queue_space =
478 			shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false);
479 		for (i = 0; i < pcxt->nworkers; ++i)
480 		{
481 			char	   *start;
482 			shm_mq	   *mq;
483 
484 			start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE;
485 			mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE);
486 			shm_mq_set_receiver(mq, MyProc);
487 			pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL);
488 		}
489 	}
490 }
491 
492 /*
493  * Launch parallel workers.
494  */
495 void
LaunchParallelWorkers(ParallelContext * pcxt)496 LaunchParallelWorkers(ParallelContext *pcxt)
497 {
498 	MemoryContext oldcontext;
499 	BackgroundWorker worker;
500 	int			i;
501 	bool		any_registrations_failed = false;
502 
503 	/* Skip this if we have no workers. */
504 	if (pcxt->nworkers == 0)
505 		return;
506 
507 	/* We need to be a lock group leader. */
508 	BecomeLockGroupLeader();
509 
510 	/* If we do have workers, we'd better have a DSM segment. */
511 	Assert(pcxt->seg != NULL);
512 
513 	/* We might be running in a short-lived memory context. */
514 	oldcontext = MemoryContextSwitchTo(TopTransactionContext);
515 
516 	/* Configure a worker. */
517 	memset(&worker, 0, sizeof(worker));
518 	snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d",
519 			 MyProcPid);
520 	snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker");
521 	worker.bgw_flags =
522 		BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION
523 		| BGWORKER_CLASS_PARALLEL;
524 	worker.bgw_start_time = BgWorkerStart_ConsistentState;
525 	worker.bgw_restart_time = BGW_NEVER_RESTART;
526 	sprintf(worker.bgw_library_name, "postgres");
527 	sprintf(worker.bgw_function_name, "ParallelWorkerMain");
528 	worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg));
529 	worker.bgw_notify_pid = MyProcPid;
530 
531 	/*
532 	 * Start workers.
533 	 *
534 	 * The caller must be able to tolerate ending up with fewer workers than
535 	 * expected, so there is no need to throw an error here if registration
536 	 * fails.  It wouldn't help much anyway, because registering the worker in
537 	 * no way guarantees that it will start up and initialize successfully.
538 	 */
539 	for (i = 0; i < pcxt->nworkers; ++i)
540 	{
541 		memcpy(worker.bgw_extra, &i, sizeof(int));
542 		if (!any_registrations_failed &&
543 			RegisterDynamicBackgroundWorker(&worker,
544 											&pcxt->worker[i].bgwhandle))
545 		{
546 			shm_mq_set_handle(pcxt->worker[i].error_mqh,
547 							  pcxt->worker[i].bgwhandle);
548 			pcxt->nworkers_launched++;
549 		}
550 		else
551 		{
552 			/*
553 			 * If we weren't able to register the worker, then we've bumped up
554 			 * against the max_worker_processes limit, and future
555 			 * registrations will probably fail too, so arrange to skip them.
556 			 * But we still have to execute this code for the remaining slots
557 			 * to make sure that we forget about the error queues we budgeted
558 			 * for those workers.  Otherwise, we'll wait for them to start,
559 			 * but they never will.
560 			 */
561 			any_registrations_failed = true;
562 			pcxt->worker[i].bgwhandle = NULL;
563 			shm_mq_detach(pcxt->worker[i].error_mqh);
564 			pcxt->worker[i].error_mqh = NULL;
565 		}
566 	}
567 
568 	/*
569 	 * Now that nworkers_launched has taken its final value, we can initialize
570 	 * known_attached_workers.
571 	 */
572 	if (pcxt->nworkers_launched > 0)
573 	{
574 		pcxt->known_attached_workers =
575 			palloc0(sizeof(bool) * pcxt->nworkers_launched);
576 		pcxt->nknown_attached_workers = 0;
577 	}
578 
579 	/* Restore previous memory context. */
580 	MemoryContextSwitchTo(oldcontext);
581 }
582 
583 /*
584  * Wait for all workers to attach to their error queues, and throw an error if
585  * any worker fails to do this.
586  *
587  * Callers can assume that if this function returns successfully, then the
588  * number of workers given by pcxt->nworkers_launched have initialized and
589  * attached to their error queues.  Whether or not these workers are guaranteed
590  * to still be running depends on what code the caller asked them to run;
591  * this function does not guarantee that they have not exited.  However, it
592  * does guarantee that any workers which exited must have done so cleanly and
593  * after successfully performing the work with which they were tasked.
594  *
595  * If this function is not called, then some of the workers that were launched
596  * may not have been started due to a fork() failure, or may have exited during
597  * early startup prior to attaching to the error queue, so nworkers_launched
598  * cannot be viewed as completely reliable.  It will never be less than the
599  * number of workers which actually started, but it might be more.  Any workers
600  * that failed to start will still be discovered by
601  * WaitForParallelWorkersToFinish and an error will be thrown at that time,
602  * provided that function is eventually reached.
603  *
604  * In general, the leader process should do as much work as possible before
605  * calling this function.  fork() failures and other early-startup failures
606  * are very uncommon, and having the leader sit idle when it could be doing
607  * useful work is undesirable.  However, if the leader needs to wait for
608  * all of its workers or for a specific worker, it may want to call this
609  * function before doing so.  If not, it must make some other provision for
610  * the failure-to-start case, lest it wait forever.  On the other hand, a
611  * leader which never waits for a worker that might not be started yet, or
612  * at least never does so prior to WaitForParallelWorkersToFinish(), need not
613  * call this function at all.
614  */
615 void
WaitForParallelWorkersToAttach(ParallelContext * pcxt)616 WaitForParallelWorkersToAttach(ParallelContext *pcxt)
617 {
618 	int			i;
619 
620 	/* Skip this if we have no launched workers. */
621 	if (pcxt->nworkers_launched == 0)
622 		return;
623 
624 	for (;;)
625 	{
626 		/*
627 		 * This will process any parallel messages that are pending and it may
628 		 * also throw an error propagated from a worker.
629 		 */
630 		CHECK_FOR_INTERRUPTS();
631 
632 		for (i = 0; i < pcxt->nworkers_launched; ++i)
633 		{
634 			BgwHandleStatus status;
635 			shm_mq	   *mq;
636 			int			rc;
637 			pid_t		pid;
638 
639 			if (pcxt->known_attached_workers[i])
640 				continue;
641 
642 			/*
643 			 * If error_mqh is NULL, then the worker has already exited
644 			 * cleanly.
645 			 */
646 			if (pcxt->worker[i].error_mqh == NULL)
647 			{
648 				pcxt->known_attached_workers[i] = true;
649 				++pcxt->nknown_attached_workers;
650 				continue;
651 			}
652 
653 			status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid);
654 			if (status == BGWH_STARTED)
655 			{
656 				/* Has the worker attached to the error queue? */
657 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
658 				if (shm_mq_get_sender(mq) != NULL)
659 				{
660 					/* Yes, so it is known to be attached. */
661 					pcxt->known_attached_workers[i] = true;
662 					++pcxt->nknown_attached_workers;
663 				}
664 			}
665 			else if (status == BGWH_STOPPED)
666 			{
667 				/*
668 				 * If the worker stopped without attaching to the error queue,
669 				 * throw an error.
670 				 */
671 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
672 				if (shm_mq_get_sender(mq) == NULL)
673 					ereport(ERROR,
674 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
675 							 errmsg("parallel worker failed to initialize"),
676 							 errhint("More details may be available in the server log.")));
677 
678 				pcxt->known_attached_workers[i] = true;
679 				++pcxt->nknown_attached_workers;
680 			}
681 			else
682 			{
683 				/*
684 				 * Worker not yet started, so we must wait.  The postmaster
685 				 * will notify us if the worker's state changes.  Our latch
686 				 * might also get set for some other reason, but if so we'll
687 				 * just end up waiting for the same worker again.
688 				 */
689 				rc = WaitLatch(MyLatch,
690 							   WL_LATCH_SET | WL_POSTMASTER_DEATH,
691 							   -1, WAIT_EVENT_BGWORKER_STARTUP);
692 
693 				/* emergency bailout if postmaster has died */
694 				if (rc & WL_POSTMASTER_DEATH)
695 					proc_exit(1);
696 
697 				if (rc & WL_LATCH_SET)
698 					ResetLatch(MyLatch);
699 			}
700 		}
701 
702 		/* If all workers are known to have started, we're done. */
703 		if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched)
704 		{
705 			Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched);
706 			break;
707 		}
708 	}
709 }
710 
711 /*
712  * Wait for all workers to finish computing.
713  *
714  * Even if the parallel operation seems to have completed successfully, it's
715  * important to call this function afterwards.  We must not miss any errors
716  * the workers may have thrown during the parallel operation, or any that they
717  * may yet throw while shutting down.
718  *
719  * Also, we want to update our notion of XactLastRecEnd based on worker
720  * feedback.
721  */
722 void
WaitForParallelWorkersToFinish(ParallelContext * pcxt)723 WaitForParallelWorkersToFinish(ParallelContext *pcxt)
724 {
725 	for (;;)
726 	{
727 		bool		anyone_alive = false;
728 		int			nfinished = 0;
729 		int			i;
730 
731 		/*
732 		 * This will process any parallel messages that are pending, which may
733 		 * change the outcome of the loop that follows.  It may also throw an
734 		 * error propagated from a worker.
735 		 */
736 		CHECK_FOR_INTERRUPTS();
737 
738 		for (i = 0; i < pcxt->nworkers_launched; ++i)
739 		{
740 			/*
741 			 * If error_mqh is NULL, then the worker has already exited
742 			 * cleanly.  If we have received a message through error_mqh from
743 			 * the worker, we know it started up cleanly, and therefore we're
744 			 * certain to be notified when it exits.
745 			 */
746 			if (pcxt->worker[i].error_mqh == NULL)
747 				++nfinished;
748 			else if (pcxt->known_attached_workers[i])
749 			{
750 				anyone_alive = true;
751 				break;
752 			}
753 		}
754 
755 		if (!anyone_alive)
756 		{
757 			/* If all workers are known to have finished, we're done. */
758 			if (nfinished >= pcxt->nworkers_launched)
759 			{
760 				Assert(nfinished == pcxt->nworkers_launched);
761 				break;
762 			}
763 
764 			/*
765 			 * We didn't detect any living workers, but not all workers are
766 			 * known to have exited cleanly.  Either not all workers have
767 			 * launched yet, or maybe some of them failed to start or
768 			 * terminated abnormally.
769 			 */
770 			for (i = 0; i < pcxt->nworkers_launched; ++i)
771 			{
772 				pid_t		pid;
773 				shm_mq	   *mq;
774 
775 				/*
776 				 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we
777 				 * should just keep waiting.  If it is BGWH_STOPPED, then
778 				 * further investigation is needed.
779 				 */
780 				if (pcxt->worker[i].error_mqh == NULL ||
781 					pcxt->worker[i].bgwhandle == NULL ||
782 					GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle,
783 										   &pid) != BGWH_STOPPED)
784 					continue;
785 
786 				/*
787 				 * Check whether the worker ended up stopped without ever
788 				 * attaching to the error queue.  If so, the postmaster was
789 				 * unable to fork the worker or it exited without initializing
790 				 * properly.  We must throw an error, since the caller may
791 				 * have been expecting the worker to do some work before
792 				 * exiting.
793 				 */
794 				mq = shm_mq_get_queue(pcxt->worker[i].error_mqh);
795 				if (shm_mq_get_sender(mq) == NULL)
796 					ereport(ERROR,
797 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
798 							 errmsg("parallel worker failed to initialize"),
799 							 errhint("More details may be available in the server log.")));
800 
801 				/*
802 				 * The worker is stopped, but is attached to the error queue.
803 				 * Unless there's a bug somewhere, this will only happen when
804 				 * the worker writes messages and terminates after the
805 				 * CHECK_FOR_INTERRUPTS() near the top of this function and
806 				 * before the call to GetBackgroundWorkerPid().  In that case,
807 				 * or latch should have been set as well and the right things
808 				 * will happen on the next pass through the loop.
809 				 */
810 			}
811 		}
812 
813 		WaitLatch(MyLatch, WL_LATCH_SET, -1,
814 				  WAIT_EVENT_PARALLEL_FINISH);
815 		ResetLatch(MyLatch);
816 	}
817 
818 	if (pcxt->toc != NULL)
819 	{
820 		FixedParallelState *fps;
821 
822 		fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false);
823 		if (fps->last_xlog_end > XactLastRecEnd)
824 			XactLastRecEnd = fps->last_xlog_end;
825 	}
826 }
827 
828 /*
829  * Wait for all workers to exit.
830  *
831  * This function ensures that workers have been completely shutdown.  The
832  * difference between WaitForParallelWorkersToFinish and this function is
833  * that former just ensures that last message sent by worker backend is
834  * received by master backend whereas this ensures the complete shutdown.
835  */
836 static void
WaitForParallelWorkersToExit(ParallelContext * pcxt)837 WaitForParallelWorkersToExit(ParallelContext *pcxt)
838 {
839 	int			i;
840 
841 	/* Wait until the workers actually die. */
842 	for (i = 0; i < pcxt->nworkers_launched; ++i)
843 	{
844 		BgwHandleStatus status;
845 
846 		if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL)
847 			continue;
848 
849 		status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle);
850 
851 		/*
852 		 * If the postmaster kicked the bucket, we have no chance of cleaning
853 		 * up safely -- we won't be able to tell when our workers are actually
854 		 * dead.  This doesn't necessitate a PANIC since they will all abort
855 		 * eventually, but we can't safely continue this session.
856 		 */
857 		if (status == BGWH_POSTMASTER_DIED)
858 			ereport(FATAL,
859 					(errcode(ERRCODE_ADMIN_SHUTDOWN),
860 					 errmsg("postmaster exited during a parallel transaction")));
861 
862 		/* Release memory. */
863 		pfree(pcxt->worker[i].bgwhandle);
864 		pcxt->worker[i].bgwhandle = NULL;
865 	}
866 }
867 
868 /*
869  * Destroy a parallel context.
870  *
871  * If expecting a clean exit, you should use WaitForParallelWorkersToFinish()
872  * first, before calling this function.  When this function is invoked, any
873  * remaining workers are forcibly killed; the dynamic shared memory segment
874  * is unmapped; and we then wait (uninterruptibly) for the workers to exit.
875  */
876 void
DestroyParallelContext(ParallelContext * pcxt)877 DestroyParallelContext(ParallelContext *pcxt)
878 {
879 	int			i;
880 
881 	/*
882 	 * Be careful about order of operations here!  We remove the parallel
883 	 * context from the list before we do anything else; otherwise, if an
884 	 * error occurs during a subsequent step, we might try to nuke it again
885 	 * from AtEOXact_Parallel or AtEOSubXact_Parallel.
886 	 */
887 	dlist_delete(&pcxt->node);
888 
889 	/* Kill each worker in turn, and forget their error queues. */
890 	if (pcxt->worker != NULL)
891 	{
892 		for (i = 0; i < pcxt->nworkers_launched; ++i)
893 		{
894 			if (pcxt->worker[i].error_mqh != NULL)
895 			{
896 				TerminateBackgroundWorker(pcxt->worker[i].bgwhandle);
897 
898 				shm_mq_detach(pcxt->worker[i].error_mqh);
899 				pcxt->worker[i].error_mqh = NULL;
900 			}
901 		}
902 	}
903 
904 	/*
905 	 * If we have allocated a shared memory segment, detach it.  This will
906 	 * implicitly detach the error queues, and any other shared memory queues,
907 	 * stored there.
908 	 */
909 	if (pcxt->seg != NULL)
910 	{
911 		dsm_detach(pcxt->seg);
912 		pcxt->seg = NULL;
913 	}
914 
915 	/*
916 	 * If this parallel context is actually in backend-private memory rather
917 	 * than shared memory, free that memory instead.
918 	 */
919 	if (pcxt->private_memory != NULL)
920 	{
921 		pfree(pcxt->private_memory);
922 		pcxt->private_memory = NULL;
923 	}
924 
925 	/*
926 	 * We can't finish transaction commit or abort until all of the workers
927 	 * have exited.  This means, in particular, that we can't respond to
928 	 * interrupts at this stage.
929 	 */
930 	HOLD_INTERRUPTS();
931 	WaitForParallelWorkersToExit(pcxt);
932 	RESUME_INTERRUPTS();
933 
934 	/* Free the worker array itself. */
935 	if (pcxt->worker != NULL)
936 	{
937 		pfree(pcxt->worker);
938 		pcxt->worker = NULL;
939 	}
940 
941 	/* Free memory. */
942 	pfree(pcxt->library_name);
943 	pfree(pcxt->function_name);
944 	pfree(pcxt);
945 }
946 
947 /*
948  * Are there any parallel contexts currently active?
949  */
950 bool
ParallelContextActive(void)951 ParallelContextActive(void)
952 {
953 	return !dlist_is_empty(&pcxt_list);
954 }
955 
956 /*
957  * Handle receipt of an interrupt indicating a parallel worker message.
958  *
959  * Note: this is called within a signal handler!  All we can do is set
960  * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke
961  * HandleParallelMessages().
962  */
963 void
HandleParallelMessageInterrupt(void)964 HandleParallelMessageInterrupt(void)
965 {
966 	InterruptPending = true;
967 	ParallelMessagePending = true;
968 	SetLatch(MyLatch);
969 }
970 
971 /*
972  * Handle any queued protocol messages received from parallel workers.
973  */
974 void
HandleParallelMessages(void)975 HandleParallelMessages(void)
976 {
977 	dlist_iter	iter;
978 	MemoryContext oldcontext;
979 
980 	static MemoryContext hpm_context = NULL;
981 
982 	/*
983 	 * This is invoked from ProcessInterrupts(), and since some of the
984 	 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential
985 	 * for recursive calls if more signals are received while this runs.  It's
986 	 * unclear that recursive entry would be safe, and it doesn't seem useful
987 	 * even if it is safe, so let's block interrupts until done.
988 	 */
989 	HOLD_INTERRUPTS();
990 
991 	/*
992 	 * Moreover, CurrentMemoryContext might be pointing almost anywhere.  We
993 	 * don't want to risk leaking data into long-lived contexts, so let's do
994 	 * our work here in a private context that we can reset on each use.
995 	 */
996 	if (hpm_context == NULL)	/* first time through? */
997 		hpm_context = AllocSetContextCreate(TopMemoryContext,
998 											"HandleParallelMessages",
999 											ALLOCSET_DEFAULT_SIZES);
1000 	else
1001 		MemoryContextReset(hpm_context);
1002 
1003 	oldcontext = MemoryContextSwitchTo(hpm_context);
1004 
1005 	/* OK to process messages.  Reset the flag saying there are more to do. */
1006 	ParallelMessagePending = false;
1007 
1008 	dlist_foreach(iter, &pcxt_list)
1009 	{
1010 		ParallelContext *pcxt;
1011 		int			i;
1012 
1013 		pcxt = dlist_container(ParallelContext, node, iter.cur);
1014 		if (pcxt->worker == NULL)
1015 			continue;
1016 
1017 		for (i = 0; i < pcxt->nworkers_launched; ++i)
1018 		{
1019 			/*
1020 			 * Read as many messages as we can from each worker, but stop when
1021 			 * either (1) the worker's error queue goes away, which can happen
1022 			 * if we receive a Terminate message from the worker; or (2) no
1023 			 * more messages can be read from the worker without blocking.
1024 			 */
1025 			while (pcxt->worker[i].error_mqh != NULL)
1026 			{
1027 				shm_mq_result res;
1028 				Size		nbytes;
1029 				void	   *data;
1030 
1031 				res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes,
1032 									 &data, true);
1033 				if (res == SHM_MQ_WOULD_BLOCK)
1034 					break;
1035 				else if (res == SHM_MQ_SUCCESS)
1036 				{
1037 					StringInfoData msg;
1038 
1039 					initStringInfo(&msg);
1040 					appendBinaryStringInfo(&msg, data, nbytes);
1041 					HandleParallelMessage(pcxt, i, &msg);
1042 					pfree(msg.data);
1043 				}
1044 				else
1045 					ereport(ERROR,
1046 							(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1047 							 errmsg("lost connection to parallel worker")));
1048 			}
1049 		}
1050 	}
1051 
1052 	MemoryContextSwitchTo(oldcontext);
1053 
1054 	/* Might as well clear the context on our way out */
1055 	MemoryContextReset(hpm_context);
1056 
1057 	RESUME_INTERRUPTS();
1058 }
1059 
1060 /*
1061  * Handle a single protocol message received from a single parallel worker.
1062  */
1063 static void
HandleParallelMessage(ParallelContext * pcxt,int i,StringInfo msg)1064 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg)
1065 {
1066 	char		msgtype;
1067 
1068 	if (pcxt->known_attached_workers != NULL &&
1069 		!pcxt->known_attached_workers[i])
1070 	{
1071 		pcxt->known_attached_workers[i] = true;
1072 		pcxt->nknown_attached_workers++;
1073 	}
1074 
1075 	msgtype = pq_getmsgbyte(msg);
1076 
1077 	switch (msgtype)
1078 	{
1079 		case 'K':				/* BackendKeyData */
1080 			{
1081 				int32		pid = pq_getmsgint(msg, 4);
1082 
1083 				(void) pq_getmsgint(msg, 4);	/* discard cancel key */
1084 				(void) pq_getmsgend(msg);
1085 				pcxt->worker[i].pid = pid;
1086 				break;
1087 			}
1088 
1089 		case 'E':				/* ErrorResponse */
1090 		case 'N':				/* NoticeResponse */
1091 			{
1092 				ErrorData	edata;
1093 				ErrorContextCallback *save_error_context_stack;
1094 
1095 				/* Parse ErrorResponse or NoticeResponse. */
1096 				pq_parse_errornotice(msg, &edata);
1097 
1098 				/* Death of a worker isn't enough justification for suicide. */
1099 				edata.elevel = Min(edata.elevel, ERROR);
1100 
1101 				/*
1102 				 * If desired, add a context line to show that this is a
1103 				 * message propagated from a parallel worker.  Otherwise, it
1104 				 * can sometimes be confusing to understand what actually
1105 				 * happened.  (We don't do this in FORCE_PARALLEL_REGRESS mode
1106 				 * because it causes test-result instability depending on
1107 				 * whether a parallel worker is actually used or not.)
1108 				 */
1109 				if (force_parallel_mode != FORCE_PARALLEL_REGRESS)
1110 				{
1111 					if (edata.context)
1112 						edata.context = psprintf("%s\n%s", edata.context,
1113 												 _("parallel worker"));
1114 					else
1115 						edata.context = pstrdup(_("parallel worker"));
1116 				}
1117 
1118 				/*
1119 				 * Context beyond that should use the error context callbacks
1120 				 * that were in effect when the ParallelContext was created,
1121 				 * not the current ones.
1122 				 */
1123 				save_error_context_stack = error_context_stack;
1124 				error_context_stack = pcxt->error_context_stack;
1125 
1126 				/* Rethrow error or print notice. */
1127 				ThrowErrorData(&edata);
1128 
1129 				/* Not an error, so restore previous context stack. */
1130 				error_context_stack = save_error_context_stack;
1131 
1132 				break;
1133 			}
1134 
1135 		case 'A':				/* NotifyResponse */
1136 			{
1137 				/* Propagate NotifyResponse. */
1138 				int32		pid;
1139 				const char *channel;
1140 				const char *payload;
1141 
1142 				pid = pq_getmsgint(msg, 4);
1143 				channel = pq_getmsgrawstring(msg);
1144 				payload = pq_getmsgrawstring(msg);
1145 				pq_endmessage(msg);
1146 
1147 				NotifyMyFrontEnd(channel, payload, pid);
1148 
1149 				break;
1150 			}
1151 
1152 		case 'X':				/* Terminate, indicating clean exit */
1153 			{
1154 				shm_mq_detach(pcxt->worker[i].error_mqh);
1155 				pcxt->worker[i].error_mqh = NULL;
1156 				break;
1157 			}
1158 
1159 		default:
1160 			{
1161 				elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)",
1162 					 msgtype, msg->len);
1163 			}
1164 	}
1165 }
1166 
1167 /*
1168  * End-of-subtransaction cleanup for parallel contexts.
1169  *
1170  * Currently, it's forbidden to enter or leave a subtransaction while
1171  * parallel mode is in effect, so we could just blow away everything.  But
1172  * we may want to relax that restriction in the future, so this code
1173  * contemplates that there may be multiple subtransaction IDs in pcxt_list.
1174  */
1175 void
AtEOSubXact_Parallel(bool isCommit,SubTransactionId mySubId)1176 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId)
1177 {
1178 	while (!dlist_is_empty(&pcxt_list))
1179 	{
1180 		ParallelContext *pcxt;
1181 
1182 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1183 		if (pcxt->subid != mySubId)
1184 			break;
1185 		if (isCommit)
1186 			elog(WARNING, "leaked parallel context");
1187 		DestroyParallelContext(pcxt);
1188 	}
1189 }
1190 
1191 /*
1192  * End-of-transaction cleanup for parallel contexts.
1193  */
1194 void
AtEOXact_Parallel(bool isCommit)1195 AtEOXact_Parallel(bool isCommit)
1196 {
1197 	while (!dlist_is_empty(&pcxt_list))
1198 	{
1199 		ParallelContext *pcxt;
1200 
1201 		pcxt = dlist_head_element(ParallelContext, node, &pcxt_list);
1202 		if (isCommit)
1203 			elog(WARNING, "leaked parallel context");
1204 		DestroyParallelContext(pcxt);
1205 	}
1206 }
1207 
1208 /*
1209  * Main entrypoint for parallel workers.
1210  */
1211 void
ParallelWorkerMain(Datum main_arg)1212 ParallelWorkerMain(Datum main_arg)
1213 {
1214 	dsm_segment *seg;
1215 	shm_toc    *toc;
1216 	FixedParallelState *fps;
1217 	char	   *error_queue_space;
1218 	shm_mq	   *mq;
1219 	shm_mq_handle *mqh;
1220 	char	   *libraryspace;
1221 	char	   *entrypointstate;
1222 	char	   *library_name;
1223 	char	   *function_name;
1224 	parallel_worker_main_type entrypt;
1225 	char	   *gucspace;
1226 	char	   *combocidspace;
1227 	char	   *tsnapspace;
1228 	char	   *asnapspace;
1229 	char	   *tstatespace;
1230 	char	   *reindexspace;
1231 	StringInfoData msgbuf;
1232 	char	   *session_dsm_handle_space;
1233 	Snapshot	tsnapshot;
1234 	Snapshot	asnapshot;
1235 
1236 	/* Set flag to indicate that we're initializing a parallel worker. */
1237 	InitializingParallelWorker = true;
1238 
1239 	/* Establish signal handlers. */
1240 	pqsignal(SIGTERM, die);
1241 	BackgroundWorkerUnblockSignals();
1242 
1243 	/* Determine and set our parallel worker number. */
1244 	Assert(ParallelWorkerNumber == -1);
1245 	memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int));
1246 
1247 	/* Set up a memory context and resource owner. */
1248 	Assert(CurrentResourceOwner == NULL);
1249 	CurrentResourceOwner = ResourceOwnerCreate(NULL, "parallel toplevel");
1250 	CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext,
1251 												 "Parallel worker",
1252 												 ALLOCSET_DEFAULT_SIZES);
1253 
1254 	/*
1255 	 * Now that we have a resource owner, we can attach to the dynamic shared
1256 	 * memory segment and read the table of contents.
1257 	 */
1258 	seg = dsm_attach(DatumGetUInt32(main_arg));
1259 	if (seg == NULL)
1260 		ereport(ERROR,
1261 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1262 				 errmsg("could not map dynamic shared memory segment")));
1263 	toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg));
1264 	if (toc == NULL)
1265 		ereport(ERROR,
1266 				(errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1267 				 errmsg("invalid magic number in dynamic shared memory segment")));
1268 
1269 	/* Look up fixed parallel state. */
1270 	fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false);
1271 	MyFixedParallelState = fps;
1272 
1273 	/* Arrange to signal the leader if we exit. */
1274 	ParallelMasterPid = fps->parallel_master_pid;
1275 	ParallelMasterBackendId = fps->parallel_master_backend_id;
1276 	on_shmem_exit(ParallelWorkerShutdown, (Datum) 0);
1277 
1278 	/*
1279 	 * Now we can find and attach to the error queue provided for us.  That's
1280 	 * good, because until we do that, any errors that happen here will not be
1281 	 * reported back to the process that requested that this worker be
1282 	 * launched.
1283 	 */
1284 	error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false);
1285 	mq = (shm_mq *) (error_queue_space +
1286 					 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE);
1287 	shm_mq_set_sender(mq, MyProc);
1288 	mqh = shm_mq_attach(mq, seg, NULL);
1289 	pq_redirect_to_shm_mq(seg, mqh);
1290 	pq_set_parallel_master(fps->parallel_master_pid,
1291 						   fps->parallel_master_backend_id);
1292 
1293 	/*
1294 	 * Send a BackendKeyData message to the process that initiated parallelism
1295 	 * so that it has access to our PID before it receives any other messages
1296 	 * from us.  Our cancel key is sent, too, since that's the way the
1297 	 * protocol message is defined, but it won't actually be used for anything
1298 	 * in this case.
1299 	 */
1300 	pq_beginmessage(&msgbuf, 'K');
1301 	pq_sendint32(&msgbuf, (int32) MyProcPid);
1302 	pq_sendint32(&msgbuf, (int32) MyCancelKey);
1303 	pq_endmessage(&msgbuf);
1304 
1305 	/*
1306 	 * Hooray! Primary initialization is complete.  Now, we need to set up our
1307 	 * backend-local state to match the original backend.
1308 	 */
1309 
1310 	/*
1311 	 * Join locking group.  We must do this before anything that could try to
1312 	 * acquire a heavyweight lock, because any heavyweight locks acquired to
1313 	 * this point could block either directly against the parallel group
1314 	 * leader or against some process which in turn waits for a lock that
1315 	 * conflicts with the parallel group leader, causing an undetected
1316 	 * deadlock.  (If we can't join the lock group, the leader has gone away,
1317 	 * so just exit quietly.)
1318 	 */
1319 	if (!BecomeLockGroupMember(fps->parallel_master_pgproc,
1320 							   fps->parallel_master_pid))
1321 		return;
1322 
1323 	/*
1324 	 * Restore transaction and statement start-time timestamps.  This must
1325 	 * happen before anything that would start a transaction, else asserts in
1326 	 * xact.c will fire.
1327 	 */
1328 	SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts);
1329 
1330 	/*
1331 	 * Identify the entry point to be called.  In theory this could result in
1332 	 * loading an additional library, though most likely the entry point is in
1333 	 * the core backend or in a library we just loaded.
1334 	 */
1335 	entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false);
1336 	library_name = entrypointstate;
1337 	function_name = entrypointstate + strlen(library_name) + 1;
1338 
1339 	entrypt = LookupParallelWorkerFunction(library_name, function_name);
1340 
1341 	/* Restore database connection. */
1342 	BackgroundWorkerInitializeConnectionByOid(fps->database_id,
1343 											  fps->authenticated_user_id,
1344 											  0);
1345 
1346 	/*
1347 	 * Set the client encoding to the database encoding, since that is what
1348 	 * the leader will expect.
1349 	 */
1350 	SetClientEncoding(GetDatabaseEncoding());
1351 
1352 	/*
1353 	 * Load libraries that were loaded by original backend.  We want to do
1354 	 * this before restoring GUCs, because the libraries might define custom
1355 	 * variables.
1356 	 */
1357 	libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false);
1358 	StartTransactionCommand();
1359 	RestoreLibraryState(libraryspace);
1360 
1361 	/* Restore GUC values from launching backend. */
1362 	gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false);
1363 	RestoreGUCState(gucspace);
1364 	CommitTransactionCommand();
1365 
1366 	/* Crank up a transaction state appropriate to a parallel worker. */
1367 	tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false);
1368 	StartParallelWorkerTransaction(tstatespace);
1369 
1370 	/* Restore combo CID state. */
1371 	combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false);
1372 	RestoreComboCIDState(combocidspace);
1373 
1374 	/* Attach to the per-session DSM segment and contained objects. */
1375 	session_dsm_handle_space =
1376 		shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false);
1377 	AttachSession(*(dsm_handle *) session_dsm_handle_space);
1378 
1379 	/*
1380 	 * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE,
1381 	 * the leader has serialized the transaction snapshot and we must restore
1382 	 * it. At lower isolation levels, there is no transaction-lifetime
1383 	 * snapshot, but we need TransactionXmin to get set to a value which is
1384 	 * less than or equal to the xmin of every snapshot that will be used by
1385 	 * this worker. The easiest way to accomplish that is to install the
1386 	 * active snapshot as the transaction snapshot. Code running in this
1387 	 * parallel worker might take new snapshots via GetTransactionSnapshot()
1388 	 * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a
1389 	 * snapshot older than the active snapshot.
1390 	 */
1391 	asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false);
1392 	tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true);
1393 	asnapshot = RestoreSnapshot(asnapspace);
1394 	tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot;
1395 	RestoreTransactionSnapshot(tsnapshot,
1396 							   fps->parallel_master_pgproc);
1397 	PushActiveSnapshot(asnapshot);
1398 
1399 	/*
1400 	 * We've changed which tuples we can see, and must therefore invalidate
1401 	 * system caches.
1402 	 */
1403 	InvalidateSystemCaches();
1404 
1405 	/*
1406 	 * Restore current role id.  Skip verifying whether session user is
1407 	 * allowed to become this role and blindly restore the leader's state for
1408 	 * current role.
1409 	 */
1410 	SetCurrentRoleId(fps->outer_user_id, fps->is_superuser);
1411 
1412 	/* Restore user ID and security context. */
1413 	SetUserIdAndSecContext(fps->current_user_id, fps->sec_context);
1414 
1415 	/* Restore temp-namespace state to ensure search path matches leader's. */
1416 	SetTempNamespaceState(fps->temp_namespace_id,
1417 						  fps->temp_toast_namespace_id);
1418 
1419 	/* Restore reindex state. */
1420 	reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false);
1421 	RestoreReindexState(reindexspace);
1422 
1423 	/*
1424 	 * We've initialized all of our state now; nothing should change
1425 	 * hereafter.
1426 	 */
1427 	InitializingParallelWorker = false;
1428 	EnterParallelMode();
1429 
1430 	/*
1431 	 * Time to do the real work: invoke the caller-supplied code.
1432 	 */
1433 	entrypt(seg, toc);
1434 
1435 	/* Must exit parallel mode to pop active snapshot. */
1436 	ExitParallelMode();
1437 
1438 	/* Must pop active snapshot so resowner.c doesn't complain. */
1439 	PopActiveSnapshot();
1440 
1441 	/* Shut down the parallel-worker transaction. */
1442 	EndParallelWorkerTransaction();
1443 
1444 	/* Detach from the per-session DSM segment. */
1445 	DetachSession();
1446 
1447 	/* Report success. */
1448 	pq_putmessage('X', NULL, 0);
1449 }
1450 
1451 /*
1452  * Update shared memory with the ending location of the last WAL record we
1453  * wrote, if it's greater than the value already stored there.
1454  */
1455 void
ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)1456 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end)
1457 {
1458 	FixedParallelState *fps = MyFixedParallelState;
1459 
1460 	Assert(fps != NULL);
1461 	SpinLockAcquire(&fps->mutex);
1462 	if (fps->last_xlog_end < last_xlog_end)
1463 		fps->last_xlog_end = last_xlog_end;
1464 	SpinLockRelease(&fps->mutex);
1465 }
1466 
1467 /*
1468  * Make sure the leader tries to read from our error queue one more time.
1469  * This guards against the case where we exit uncleanly without sending an
1470  * ErrorResponse to the leader, for example because some code calls proc_exit
1471  * directly.
1472  */
1473 static void
ParallelWorkerShutdown(int code,Datum arg)1474 ParallelWorkerShutdown(int code, Datum arg)
1475 {
1476 	SendProcSignal(ParallelMasterPid,
1477 				   PROCSIG_PARALLEL_MESSAGE,
1478 				   ParallelMasterBackendId);
1479 }
1480 
1481 /*
1482  * Look up (and possibly load) a parallel worker entry point function.
1483  *
1484  * For functions contained in the core code, we use library name "postgres"
1485  * and consult the InternalParallelWorkers array.  External functions are
1486  * looked up, and loaded if necessary, using load_external_function().
1487  *
1488  * The point of this is to pass function names as strings across process
1489  * boundaries.  We can't pass actual function addresses because of the
1490  * possibility that the function has been loaded at a different address
1491  * in a different process.  This is obviously a hazard for functions in
1492  * loadable libraries, but it can happen even for functions in the core code
1493  * on platforms using EXEC_BACKEND (e.g., Windows).
1494  *
1495  * At some point it might be worthwhile to get rid of InternalParallelWorkers[]
1496  * in favor of applying load_external_function() for core functions too;
1497  * but that raises portability issues that are not worth addressing now.
1498  */
1499 static parallel_worker_main_type
LookupParallelWorkerFunction(const char * libraryname,const char * funcname)1500 LookupParallelWorkerFunction(const char *libraryname, const char *funcname)
1501 {
1502 	/*
1503 	 * If the function is to be loaded from postgres itself, search the
1504 	 * InternalParallelWorkers array.
1505 	 */
1506 	if (strcmp(libraryname, "postgres") == 0)
1507 	{
1508 		int			i;
1509 
1510 		for (i = 0; i < lengthof(InternalParallelWorkers); i++)
1511 		{
1512 			if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0)
1513 				return InternalParallelWorkers[i].fn_addr;
1514 		}
1515 
1516 		/* We can only reach this by programming error. */
1517 		elog(ERROR, "internal function \"%s\" not found", funcname);
1518 	}
1519 
1520 	/* Otherwise load from external library. */
1521 	return (parallel_worker_main_type)
1522 		load_external_function(libraryname, funcname, true, NULL);
1523 }
1524