1 /*------------------------------------------------------------------------- 2 * 3 * parallel.c 4 * Infrastructure for launching parallel workers 5 * 6 * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * IDENTIFICATION 10 * src/backend/access/transam/parallel.c 11 * 12 *------------------------------------------------------------------------- 13 */ 14 15 #include "postgres.h" 16 17 #include "access/nbtree.h" 18 #include "access/parallel.h" 19 #include "access/session.h" 20 #include "access/xact.h" 21 #include "access/xlog.h" 22 #include "catalog/pg_enum.h" 23 #include "catalog/index.h" 24 #include "catalog/namespace.h" 25 #include "commands/async.h" 26 #include "executor/execParallel.h" 27 #include "libpq/libpq.h" 28 #include "libpq/pqformat.h" 29 #include "libpq/pqmq.h" 30 #include "miscadmin.h" 31 #include "optimizer/optimizer.h" 32 #include "pgstat.h" 33 #include "storage/ipc.h" 34 #include "storage/predicate.h" 35 #include "storage/sinval.h" 36 #include "storage/spin.h" 37 #include "tcop/tcopprot.h" 38 #include "utils/combocid.h" 39 #include "utils/guc.h" 40 #include "utils/inval.h" 41 #include "utils/memutils.h" 42 #include "utils/relmapper.h" 43 #include "utils/snapmgr.h" 44 #include "utils/typcache.h" 45 46 47 /* 48 * We don't want to waste a lot of memory on an error queue which, most of 49 * the time, will process only a handful of small messages. However, it is 50 * desirable to make it large enough that a typical ErrorResponse can be sent 51 * without blocking. That way, a worker that errors out can write the whole 52 * message into the queue and terminate without waiting for the user backend. 53 */ 54 #define PARALLEL_ERROR_QUEUE_SIZE 16384 55 56 /* Magic number for parallel context TOC. */ 57 #define PARALLEL_MAGIC 0x50477c7c 58 59 /* 60 * Magic numbers for per-context parallel state sharing. Higher-level code 61 * should use smaller values, leaving these very large ones for use by this 62 * module. 63 */ 64 #define PARALLEL_KEY_FIXED UINT64CONST(0xFFFFFFFFFFFF0001) 65 #define PARALLEL_KEY_ERROR_QUEUE UINT64CONST(0xFFFFFFFFFFFF0002) 66 #define PARALLEL_KEY_LIBRARY UINT64CONST(0xFFFFFFFFFFFF0003) 67 #define PARALLEL_KEY_GUC UINT64CONST(0xFFFFFFFFFFFF0004) 68 #define PARALLEL_KEY_COMBO_CID UINT64CONST(0xFFFFFFFFFFFF0005) 69 #define PARALLEL_KEY_TRANSACTION_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0006) 70 #define PARALLEL_KEY_ACTIVE_SNAPSHOT UINT64CONST(0xFFFFFFFFFFFF0007) 71 #define PARALLEL_KEY_TRANSACTION_STATE UINT64CONST(0xFFFFFFFFFFFF0008) 72 #define PARALLEL_KEY_ENTRYPOINT UINT64CONST(0xFFFFFFFFFFFF0009) 73 #define PARALLEL_KEY_SESSION_DSM UINT64CONST(0xFFFFFFFFFFFF000A) 74 #define PARALLEL_KEY_REINDEX_STATE UINT64CONST(0xFFFFFFFFFFFF000B) 75 #define PARALLEL_KEY_RELMAPPER_STATE UINT64CONST(0xFFFFFFFFFFFF000C) 76 #define PARALLEL_KEY_ENUMBLACKLIST UINT64CONST(0xFFFFFFFFFFFF000D) 77 78 /* Fixed-size parallel state. */ 79 typedef struct FixedParallelState 80 { 81 /* Fixed-size state that workers must restore. */ 82 Oid database_id; 83 Oid authenticated_user_id; 84 Oid current_user_id; 85 Oid outer_user_id; 86 Oid temp_namespace_id; 87 Oid temp_toast_namespace_id; 88 int sec_context; 89 bool is_superuser; 90 PGPROC *parallel_master_pgproc; 91 pid_t parallel_master_pid; 92 BackendId parallel_master_backend_id; 93 TimestampTz xact_ts; 94 TimestampTz stmt_ts; 95 SerializableXactHandle serializable_xact_handle; 96 97 /* Mutex protects remaining fields. */ 98 slock_t mutex; 99 100 /* Maximum XactLastRecEnd of any worker. */ 101 XLogRecPtr last_xlog_end; 102 } FixedParallelState; 103 104 /* 105 * Our parallel worker number. We initialize this to -1, meaning that we are 106 * not a parallel worker. In parallel workers, it will be set to a value >= 0 107 * and < the number of workers before any user code is invoked; each parallel 108 * worker will get a different parallel worker number. 109 */ 110 int ParallelWorkerNumber = -1; 111 112 /* Is there a parallel message pending which we need to receive? */ 113 volatile bool ParallelMessagePending = false; 114 115 /* Are we initializing a parallel worker? */ 116 bool InitializingParallelWorker = false; 117 118 /* Pointer to our fixed parallel state. */ 119 static FixedParallelState *MyFixedParallelState; 120 121 /* List of active parallel contexts. */ 122 static dlist_head pcxt_list = DLIST_STATIC_INIT(pcxt_list); 123 124 /* Backend-local copy of data from FixedParallelState. */ 125 static pid_t ParallelMasterPid; 126 127 /* 128 * List of internal parallel worker entry points. We need this for 129 * reasons explained in LookupParallelWorkerFunction(), below. 130 */ 131 static const struct 132 { 133 const char *fn_name; 134 parallel_worker_main_type fn_addr; 135 } InternalParallelWorkers[] = 136 137 { 138 { 139 "ParallelQueryMain", ParallelQueryMain 140 }, 141 { 142 "_bt_parallel_build_main", _bt_parallel_build_main 143 } 144 }; 145 146 /* Private functions. */ 147 static void HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg); 148 static void WaitForParallelWorkersToExit(ParallelContext *pcxt); 149 static parallel_worker_main_type LookupParallelWorkerFunction(const char *libraryname, const char *funcname); 150 static void ParallelWorkerShutdown(int code, Datum arg); 151 152 153 /* 154 * Establish a new parallel context. This should be done after entering 155 * parallel mode, and (unless there is an error) the context should be 156 * destroyed before exiting the current subtransaction. 157 */ 158 ParallelContext * 159 CreateParallelContext(const char *library_name, const char *function_name, 160 int nworkers) 161 { 162 MemoryContext oldcontext; 163 ParallelContext *pcxt; 164 165 /* It is unsafe to create a parallel context if not in parallel mode. */ 166 Assert(IsInParallelMode()); 167 168 /* Number of workers should be non-negative. */ 169 Assert(nworkers >= 0); 170 171 /* We might be running in a short-lived memory context. */ 172 oldcontext = MemoryContextSwitchTo(TopTransactionContext); 173 174 /* Initialize a new ParallelContext. */ 175 pcxt = palloc0(sizeof(ParallelContext)); 176 pcxt->subid = GetCurrentSubTransactionId(); 177 pcxt->nworkers = nworkers; 178 pcxt->library_name = pstrdup(library_name); 179 pcxt->function_name = pstrdup(function_name); 180 pcxt->error_context_stack = error_context_stack; 181 shm_toc_initialize_estimator(&pcxt->estimator); 182 dlist_push_head(&pcxt_list, &pcxt->node); 183 184 /* Restore previous memory context. */ 185 MemoryContextSwitchTo(oldcontext); 186 187 return pcxt; 188 } 189 190 /* 191 * Establish the dynamic shared memory segment for a parallel context and 192 * copy state and other bookkeeping information that will be needed by 193 * parallel workers into it. 194 */ 195 void 196 InitializeParallelDSM(ParallelContext *pcxt) 197 { 198 MemoryContext oldcontext; 199 Size library_len = 0; 200 Size guc_len = 0; 201 Size combocidlen = 0; 202 Size tsnaplen = 0; 203 Size asnaplen = 0; 204 Size tstatelen = 0; 205 Size reindexlen = 0; 206 Size relmapperlen = 0; 207 Size enumblacklistlen = 0; 208 Size segsize = 0; 209 int i; 210 FixedParallelState *fps; 211 dsm_handle session_dsm_handle = DSM_HANDLE_INVALID; 212 Snapshot transaction_snapshot = GetTransactionSnapshot(); 213 Snapshot active_snapshot = GetActiveSnapshot(); 214 215 /* We might be running in a very short-lived memory context. */ 216 oldcontext = MemoryContextSwitchTo(TopTransactionContext); 217 218 /* Allow space to store the fixed-size parallel state. */ 219 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(FixedParallelState)); 220 shm_toc_estimate_keys(&pcxt->estimator, 1); 221 222 /* 223 * Normally, the user will have requested at least one worker process, but 224 * if by chance they have not, we can skip a bunch of things here. 225 */ 226 if (pcxt->nworkers > 0) 227 { 228 /* Get (or create) the per-session DSM segment's handle. */ 229 session_dsm_handle = GetSessionDsmHandle(); 230 231 /* 232 * If we weren't able to create a per-session DSM segment, then we can 233 * continue but we can't safely launch any workers because their 234 * record typmods would be incompatible so they couldn't exchange 235 * tuples. 236 */ 237 if (session_dsm_handle == DSM_HANDLE_INVALID) 238 pcxt->nworkers = 0; 239 } 240 241 if (pcxt->nworkers > 0) 242 { 243 /* Estimate space for various kinds of state sharing. */ 244 library_len = EstimateLibraryStateSpace(); 245 shm_toc_estimate_chunk(&pcxt->estimator, library_len); 246 guc_len = EstimateGUCStateSpace(); 247 shm_toc_estimate_chunk(&pcxt->estimator, guc_len); 248 combocidlen = EstimateComboCIDStateSpace(); 249 shm_toc_estimate_chunk(&pcxt->estimator, combocidlen); 250 if (IsolationUsesXactSnapshot()) 251 { 252 tsnaplen = EstimateSnapshotSpace(transaction_snapshot); 253 shm_toc_estimate_chunk(&pcxt->estimator, tsnaplen); 254 } 255 asnaplen = EstimateSnapshotSpace(active_snapshot); 256 shm_toc_estimate_chunk(&pcxt->estimator, asnaplen); 257 tstatelen = EstimateTransactionStateSpace(); 258 shm_toc_estimate_chunk(&pcxt->estimator, tstatelen); 259 shm_toc_estimate_chunk(&pcxt->estimator, sizeof(dsm_handle)); 260 reindexlen = EstimateReindexStateSpace(); 261 shm_toc_estimate_chunk(&pcxt->estimator, reindexlen); 262 relmapperlen = EstimateRelationMapSpace(); 263 shm_toc_estimate_chunk(&pcxt->estimator, relmapperlen); 264 enumblacklistlen = EstimateEnumBlacklistSpace(); 265 shm_toc_estimate_chunk(&pcxt->estimator, enumblacklistlen); 266 /* If you add more chunks here, you probably need to add keys. */ 267 shm_toc_estimate_keys(&pcxt->estimator, 10); 268 269 /* Estimate space need for error queues. */ 270 StaticAssertStmt(BUFFERALIGN(PARALLEL_ERROR_QUEUE_SIZE) == 271 PARALLEL_ERROR_QUEUE_SIZE, 272 "parallel error queue size not buffer-aligned"); 273 shm_toc_estimate_chunk(&pcxt->estimator, 274 mul_size(PARALLEL_ERROR_QUEUE_SIZE, 275 pcxt->nworkers)); 276 shm_toc_estimate_keys(&pcxt->estimator, 1); 277 278 /* Estimate how much we'll need for the entrypoint info. */ 279 shm_toc_estimate_chunk(&pcxt->estimator, strlen(pcxt->library_name) + 280 strlen(pcxt->function_name) + 2); 281 shm_toc_estimate_keys(&pcxt->estimator, 1); 282 } 283 284 /* 285 * Create DSM and initialize with new table of contents. But if the user 286 * didn't request any workers, then don't bother creating a dynamic shared 287 * memory segment; instead, just use backend-private memory. 288 * 289 * Also, if we can't create a dynamic shared memory segment because the 290 * maximum number of segments have already been created, then fall back to 291 * backend-private memory, and plan not to use any workers. We hope this 292 * won't happen very often, but it's better to abandon the use of 293 * parallelism than to fail outright. 294 */ 295 segsize = shm_toc_estimate(&pcxt->estimator); 296 if (pcxt->nworkers > 0) 297 pcxt->seg = dsm_create(segsize, DSM_CREATE_NULL_IF_MAXSEGMENTS); 298 if (pcxt->seg != NULL) 299 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, 300 dsm_segment_address(pcxt->seg), 301 segsize); 302 else 303 { 304 pcxt->nworkers = 0; 305 pcxt->private_memory = MemoryContextAlloc(TopMemoryContext, segsize); 306 pcxt->toc = shm_toc_create(PARALLEL_MAGIC, pcxt->private_memory, 307 segsize); 308 } 309 310 /* Initialize fixed-size state in shared memory. */ 311 fps = (FixedParallelState *) 312 shm_toc_allocate(pcxt->toc, sizeof(FixedParallelState)); 313 fps->database_id = MyDatabaseId; 314 fps->authenticated_user_id = GetAuthenticatedUserId(); 315 fps->outer_user_id = GetCurrentRoleId(); 316 fps->is_superuser = session_auth_is_superuser; 317 GetUserIdAndSecContext(&fps->current_user_id, &fps->sec_context); 318 GetTempNamespaceState(&fps->temp_namespace_id, 319 &fps->temp_toast_namespace_id); 320 fps->parallel_master_pgproc = MyProc; 321 fps->parallel_master_pid = MyProcPid; 322 fps->parallel_master_backend_id = MyBackendId; 323 fps->xact_ts = GetCurrentTransactionStartTimestamp(); 324 fps->stmt_ts = GetCurrentStatementStartTimestamp(); 325 fps->serializable_xact_handle = ShareSerializableXact(); 326 SpinLockInit(&fps->mutex); 327 fps->last_xlog_end = 0; 328 shm_toc_insert(pcxt->toc, PARALLEL_KEY_FIXED, fps); 329 330 /* We can skip the rest of this if we're not budgeting for any workers. */ 331 if (pcxt->nworkers > 0) 332 { 333 char *libraryspace; 334 char *gucspace; 335 char *combocidspace; 336 char *tsnapspace; 337 char *asnapspace; 338 char *tstatespace; 339 char *reindexspace; 340 char *relmapperspace; 341 char *error_queue_space; 342 char *session_dsm_handle_space; 343 char *entrypointstate; 344 char *enumblacklistspace; 345 Size lnamelen; 346 347 /* Serialize shared libraries we have loaded. */ 348 libraryspace = shm_toc_allocate(pcxt->toc, library_len); 349 SerializeLibraryState(library_len, libraryspace); 350 shm_toc_insert(pcxt->toc, PARALLEL_KEY_LIBRARY, libraryspace); 351 352 /* Serialize GUC settings. */ 353 gucspace = shm_toc_allocate(pcxt->toc, guc_len); 354 SerializeGUCState(guc_len, gucspace); 355 shm_toc_insert(pcxt->toc, PARALLEL_KEY_GUC, gucspace); 356 357 /* Serialize combo CID state. */ 358 combocidspace = shm_toc_allocate(pcxt->toc, combocidlen); 359 SerializeComboCIDState(combocidlen, combocidspace); 360 shm_toc_insert(pcxt->toc, PARALLEL_KEY_COMBO_CID, combocidspace); 361 362 /* 363 * Serialize the transaction snapshot if the transaction 364 * isolation-level uses a transaction snapshot. 365 */ 366 if (IsolationUsesXactSnapshot()) 367 { 368 tsnapspace = shm_toc_allocate(pcxt->toc, tsnaplen); 369 SerializeSnapshot(transaction_snapshot, tsnapspace); 370 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, 371 tsnapspace); 372 } 373 374 /* Serialize the active snapshot. */ 375 asnapspace = shm_toc_allocate(pcxt->toc, asnaplen); 376 SerializeSnapshot(active_snapshot, asnapspace); 377 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, asnapspace); 378 379 /* Provide the handle for per-session segment. */ 380 session_dsm_handle_space = shm_toc_allocate(pcxt->toc, 381 sizeof(dsm_handle)); 382 *(dsm_handle *) session_dsm_handle_space = session_dsm_handle; 383 shm_toc_insert(pcxt->toc, PARALLEL_KEY_SESSION_DSM, 384 session_dsm_handle_space); 385 386 /* Serialize transaction state. */ 387 tstatespace = shm_toc_allocate(pcxt->toc, tstatelen); 388 SerializeTransactionState(tstatelen, tstatespace); 389 shm_toc_insert(pcxt->toc, PARALLEL_KEY_TRANSACTION_STATE, tstatespace); 390 391 /* Serialize reindex state. */ 392 reindexspace = shm_toc_allocate(pcxt->toc, reindexlen); 393 SerializeReindexState(reindexlen, reindexspace); 394 shm_toc_insert(pcxt->toc, PARALLEL_KEY_REINDEX_STATE, reindexspace); 395 396 /* Serialize relmapper state. */ 397 relmapperspace = shm_toc_allocate(pcxt->toc, relmapperlen); 398 SerializeRelationMap(relmapperlen, relmapperspace); 399 shm_toc_insert(pcxt->toc, PARALLEL_KEY_RELMAPPER_STATE, 400 relmapperspace); 401 402 /* Serialize enum blacklist state. */ 403 enumblacklistspace = shm_toc_allocate(pcxt->toc, enumblacklistlen); 404 SerializeEnumBlacklist(enumblacklistspace, enumblacklistlen); 405 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENUMBLACKLIST, 406 enumblacklistspace); 407 408 /* Allocate space for worker information. */ 409 pcxt->worker = palloc0(sizeof(ParallelWorkerInfo) * pcxt->nworkers); 410 411 /* 412 * Establish error queues in dynamic shared memory. 413 * 414 * These queues should be used only for transmitting ErrorResponse, 415 * NoticeResponse, and NotifyResponse protocol messages. Tuple data 416 * should be transmitted via separate (possibly larger?) queues. 417 */ 418 error_queue_space = 419 shm_toc_allocate(pcxt->toc, 420 mul_size(PARALLEL_ERROR_QUEUE_SIZE, 421 pcxt->nworkers)); 422 for (i = 0; i < pcxt->nworkers; ++i) 423 { 424 char *start; 425 shm_mq *mq; 426 427 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; 428 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); 429 shm_mq_set_receiver(mq, MyProc); 430 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); 431 } 432 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, error_queue_space); 433 434 /* 435 * Serialize entrypoint information. It's unsafe to pass function 436 * pointers across processes, as the function pointer may be different 437 * in each process in EXEC_BACKEND builds, so we always pass library 438 * and function name. (We use library name "postgres" for functions 439 * in the core backend.) 440 */ 441 lnamelen = strlen(pcxt->library_name); 442 entrypointstate = shm_toc_allocate(pcxt->toc, lnamelen + 443 strlen(pcxt->function_name) + 2); 444 strcpy(entrypointstate, pcxt->library_name); 445 strcpy(entrypointstate + lnamelen + 1, pcxt->function_name); 446 shm_toc_insert(pcxt->toc, PARALLEL_KEY_ENTRYPOINT, entrypointstate); 447 } 448 449 /* Restore previous memory context. */ 450 MemoryContextSwitchTo(oldcontext); 451 } 452 453 /* 454 * Reinitialize the dynamic shared memory segment for a parallel context such 455 * that we could launch workers for it again. 456 */ 457 void 458 ReinitializeParallelDSM(ParallelContext *pcxt) 459 { 460 FixedParallelState *fps; 461 462 /* Wait for any old workers to exit. */ 463 if (pcxt->nworkers_launched > 0) 464 { 465 WaitForParallelWorkersToFinish(pcxt); 466 WaitForParallelWorkersToExit(pcxt); 467 pcxt->nworkers_launched = 0; 468 if (pcxt->known_attached_workers) 469 { 470 pfree(pcxt->known_attached_workers); 471 pcxt->known_attached_workers = NULL; 472 pcxt->nknown_attached_workers = 0; 473 } 474 } 475 476 /* Reset a few bits of fixed parallel state to a clean state. */ 477 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false); 478 fps->last_xlog_end = 0; 479 480 /* Recreate error queues (if they exist). */ 481 if (pcxt->nworkers > 0) 482 { 483 char *error_queue_space; 484 int i; 485 486 error_queue_space = 487 shm_toc_lookup(pcxt->toc, PARALLEL_KEY_ERROR_QUEUE, false); 488 for (i = 0; i < pcxt->nworkers; ++i) 489 { 490 char *start; 491 shm_mq *mq; 492 493 start = error_queue_space + i * PARALLEL_ERROR_QUEUE_SIZE; 494 mq = shm_mq_create(start, PARALLEL_ERROR_QUEUE_SIZE); 495 shm_mq_set_receiver(mq, MyProc); 496 pcxt->worker[i].error_mqh = shm_mq_attach(mq, pcxt->seg, NULL); 497 } 498 } 499 } 500 501 /* 502 * Launch parallel workers. 503 */ 504 void 505 LaunchParallelWorkers(ParallelContext *pcxt) 506 { 507 MemoryContext oldcontext; 508 BackgroundWorker worker; 509 int i; 510 bool any_registrations_failed = false; 511 512 /* Skip this if we have no workers. */ 513 if (pcxt->nworkers == 0) 514 return; 515 516 /* We need to be a lock group leader. */ 517 BecomeLockGroupLeader(); 518 519 /* If we do have workers, we'd better have a DSM segment. */ 520 Assert(pcxt->seg != NULL); 521 522 /* We might be running in a short-lived memory context. */ 523 oldcontext = MemoryContextSwitchTo(TopTransactionContext); 524 525 /* Configure a worker. */ 526 memset(&worker, 0, sizeof(worker)); 527 snprintf(worker.bgw_name, BGW_MAXLEN, "parallel worker for PID %d", 528 MyProcPid); 529 snprintf(worker.bgw_type, BGW_MAXLEN, "parallel worker"); 530 worker.bgw_flags = 531 BGWORKER_SHMEM_ACCESS | BGWORKER_BACKEND_DATABASE_CONNECTION 532 | BGWORKER_CLASS_PARALLEL; 533 worker.bgw_start_time = BgWorkerStart_ConsistentState; 534 worker.bgw_restart_time = BGW_NEVER_RESTART; 535 sprintf(worker.bgw_library_name, "postgres"); 536 sprintf(worker.bgw_function_name, "ParallelWorkerMain"); 537 worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(pcxt->seg)); 538 worker.bgw_notify_pid = MyProcPid; 539 540 /* 541 * Start workers. 542 * 543 * The caller must be able to tolerate ending up with fewer workers than 544 * expected, so there is no need to throw an error here if registration 545 * fails. It wouldn't help much anyway, because registering the worker in 546 * no way guarantees that it will start up and initialize successfully. 547 */ 548 for (i = 0; i < pcxt->nworkers; ++i) 549 { 550 memcpy(worker.bgw_extra, &i, sizeof(int)); 551 if (!any_registrations_failed && 552 RegisterDynamicBackgroundWorker(&worker, 553 &pcxt->worker[i].bgwhandle)) 554 { 555 shm_mq_set_handle(pcxt->worker[i].error_mqh, 556 pcxt->worker[i].bgwhandle); 557 pcxt->nworkers_launched++; 558 } 559 else 560 { 561 /* 562 * If we weren't able to register the worker, then we've bumped up 563 * against the max_worker_processes limit, and future 564 * registrations will probably fail too, so arrange to skip them. 565 * But we still have to execute this code for the remaining slots 566 * to make sure that we forget about the error queues we budgeted 567 * for those workers. Otherwise, we'll wait for them to start, 568 * but they never will. 569 */ 570 any_registrations_failed = true; 571 pcxt->worker[i].bgwhandle = NULL; 572 shm_mq_detach(pcxt->worker[i].error_mqh); 573 pcxt->worker[i].error_mqh = NULL; 574 } 575 } 576 577 /* 578 * Now that nworkers_launched has taken its final value, we can initialize 579 * known_attached_workers. 580 */ 581 if (pcxt->nworkers_launched > 0) 582 { 583 pcxt->known_attached_workers = 584 palloc0(sizeof(bool) * pcxt->nworkers_launched); 585 pcxt->nknown_attached_workers = 0; 586 } 587 588 /* Restore previous memory context. */ 589 MemoryContextSwitchTo(oldcontext); 590 } 591 592 /* 593 * Wait for all workers to attach to their error queues, and throw an error if 594 * any worker fails to do this. 595 * 596 * Callers can assume that if this function returns successfully, then the 597 * number of workers given by pcxt->nworkers_launched have initialized and 598 * attached to their error queues. Whether or not these workers are guaranteed 599 * to still be running depends on what code the caller asked them to run; 600 * this function does not guarantee that they have not exited. However, it 601 * does guarantee that any workers which exited must have done so cleanly and 602 * after successfully performing the work with which they were tasked. 603 * 604 * If this function is not called, then some of the workers that were launched 605 * may not have been started due to a fork() failure, or may have exited during 606 * early startup prior to attaching to the error queue, so nworkers_launched 607 * cannot be viewed as completely reliable. It will never be less than the 608 * number of workers which actually started, but it might be more. Any workers 609 * that failed to start will still be discovered by 610 * WaitForParallelWorkersToFinish and an error will be thrown at that time, 611 * provided that function is eventually reached. 612 * 613 * In general, the leader process should do as much work as possible before 614 * calling this function. fork() failures and other early-startup failures 615 * are very uncommon, and having the leader sit idle when it could be doing 616 * useful work is undesirable. However, if the leader needs to wait for 617 * all of its workers or for a specific worker, it may want to call this 618 * function before doing so. If not, it must make some other provision for 619 * the failure-to-start case, lest it wait forever. On the other hand, a 620 * leader which never waits for a worker that might not be started yet, or 621 * at least never does so prior to WaitForParallelWorkersToFinish(), need not 622 * call this function at all. 623 */ 624 void 625 WaitForParallelWorkersToAttach(ParallelContext *pcxt) 626 { 627 int i; 628 629 /* Skip this if we have no launched workers. */ 630 if (pcxt->nworkers_launched == 0) 631 return; 632 633 for (;;) 634 { 635 /* 636 * This will process any parallel messages that are pending and it may 637 * also throw an error propagated from a worker. 638 */ 639 CHECK_FOR_INTERRUPTS(); 640 641 for (i = 0; i < pcxt->nworkers_launched; ++i) 642 { 643 BgwHandleStatus status; 644 shm_mq *mq; 645 int rc; 646 pid_t pid; 647 648 if (pcxt->known_attached_workers[i]) 649 continue; 650 651 /* 652 * If error_mqh is NULL, then the worker has already exited 653 * cleanly. 654 */ 655 if (pcxt->worker[i].error_mqh == NULL) 656 { 657 pcxt->known_attached_workers[i] = true; 658 ++pcxt->nknown_attached_workers; 659 continue; 660 } 661 662 status = GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, &pid); 663 if (status == BGWH_STARTED) 664 { 665 /* Has the worker attached to the error queue? */ 666 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); 667 if (shm_mq_get_sender(mq) != NULL) 668 { 669 /* Yes, so it is known to be attached. */ 670 pcxt->known_attached_workers[i] = true; 671 ++pcxt->nknown_attached_workers; 672 } 673 } 674 else if (status == BGWH_STOPPED) 675 { 676 /* 677 * If the worker stopped without attaching to the error queue, 678 * throw an error. 679 */ 680 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); 681 if (shm_mq_get_sender(mq) == NULL) 682 ereport(ERROR, 683 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 684 errmsg("parallel worker failed to initialize"), 685 errhint("More details may be available in the server log."))); 686 687 pcxt->known_attached_workers[i] = true; 688 ++pcxt->nknown_attached_workers; 689 } 690 else 691 { 692 /* 693 * Worker not yet started, so we must wait. The postmaster 694 * will notify us if the worker's state changes. Our latch 695 * might also get set for some other reason, but if so we'll 696 * just end up waiting for the same worker again. 697 */ 698 rc = WaitLatch(MyLatch, 699 WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 700 -1, WAIT_EVENT_BGWORKER_STARTUP); 701 702 if (rc & WL_LATCH_SET) 703 ResetLatch(MyLatch); 704 } 705 } 706 707 /* If all workers are known to have started, we're done. */ 708 if (pcxt->nknown_attached_workers >= pcxt->nworkers_launched) 709 { 710 Assert(pcxt->nknown_attached_workers == pcxt->nworkers_launched); 711 break; 712 } 713 } 714 } 715 716 /* 717 * Wait for all workers to finish computing. 718 * 719 * Even if the parallel operation seems to have completed successfully, it's 720 * important to call this function afterwards. We must not miss any errors 721 * the workers may have thrown during the parallel operation, or any that they 722 * may yet throw while shutting down. 723 * 724 * Also, we want to update our notion of XactLastRecEnd based on worker 725 * feedback. 726 */ 727 void 728 WaitForParallelWorkersToFinish(ParallelContext *pcxt) 729 { 730 for (;;) 731 { 732 bool anyone_alive = false; 733 int nfinished = 0; 734 int i; 735 736 /* 737 * This will process any parallel messages that are pending, which may 738 * change the outcome of the loop that follows. It may also throw an 739 * error propagated from a worker. 740 */ 741 CHECK_FOR_INTERRUPTS(); 742 743 for (i = 0; i < pcxt->nworkers_launched; ++i) 744 { 745 /* 746 * If error_mqh is NULL, then the worker has already exited 747 * cleanly. If we have received a message through error_mqh from 748 * the worker, we know it started up cleanly, and therefore we're 749 * certain to be notified when it exits. 750 */ 751 if (pcxt->worker[i].error_mqh == NULL) 752 ++nfinished; 753 else if (pcxt->known_attached_workers[i]) 754 { 755 anyone_alive = true; 756 break; 757 } 758 } 759 760 if (!anyone_alive) 761 { 762 /* If all workers are known to have finished, we're done. */ 763 if (nfinished >= pcxt->nworkers_launched) 764 { 765 Assert(nfinished == pcxt->nworkers_launched); 766 break; 767 } 768 769 /* 770 * We didn't detect any living workers, but not all workers are 771 * known to have exited cleanly. Either not all workers have 772 * launched yet, or maybe some of them failed to start or 773 * terminated abnormally. 774 */ 775 for (i = 0; i < pcxt->nworkers_launched; ++i) 776 { 777 pid_t pid; 778 shm_mq *mq; 779 780 /* 781 * If the worker is BGWH_NOT_YET_STARTED or BGWH_STARTED, we 782 * should just keep waiting. If it is BGWH_STOPPED, then 783 * further investigation is needed. 784 */ 785 if (pcxt->worker[i].error_mqh == NULL || 786 pcxt->worker[i].bgwhandle == NULL || 787 GetBackgroundWorkerPid(pcxt->worker[i].bgwhandle, 788 &pid) != BGWH_STOPPED) 789 continue; 790 791 /* 792 * Check whether the worker ended up stopped without ever 793 * attaching to the error queue. If so, the postmaster was 794 * unable to fork the worker or it exited without initializing 795 * properly. We must throw an error, since the caller may 796 * have been expecting the worker to do some work before 797 * exiting. 798 */ 799 mq = shm_mq_get_queue(pcxt->worker[i].error_mqh); 800 if (shm_mq_get_sender(mq) == NULL) 801 ereport(ERROR, 802 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 803 errmsg("parallel worker failed to initialize"), 804 errhint("More details may be available in the server log."))); 805 806 /* 807 * The worker is stopped, but is attached to the error queue. 808 * Unless there's a bug somewhere, this will only happen when 809 * the worker writes messages and terminates after the 810 * CHECK_FOR_INTERRUPTS() near the top of this function and 811 * before the call to GetBackgroundWorkerPid(). In that case, 812 * or latch should have been set as well and the right things 813 * will happen on the next pass through the loop. 814 */ 815 } 816 } 817 818 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, -1, 819 WAIT_EVENT_PARALLEL_FINISH); 820 ResetLatch(MyLatch); 821 } 822 823 if (pcxt->toc != NULL) 824 { 825 FixedParallelState *fps; 826 827 fps = shm_toc_lookup(pcxt->toc, PARALLEL_KEY_FIXED, false); 828 if (fps->last_xlog_end > XactLastRecEnd) 829 XactLastRecEnd = fps->last_xlog_end; 830 } 831 } 832 833 /* 834 * Wait for all workers to exit. 835 * 836 * This function ensures that workers have been completely shutdown. The 837 * difference between WaitForParallelWorkersToFinish and this function is 838 * that former just ensures that last message sent by worker backend is 839 * received by master backend whereas this ensures the complete shutdown. 840 */ 841 static void 842 WaitForParallelWorkersToExit(ParallelContext *pcxt) 843 { 844 int i; 845 846 /* Wait until the workers actually die. */ 847 for (i = 0; i < pcxt->nworkers_launched; ++i) 848 { 849 BgwHandleStatus status; 850 851 if (pcxt->worker == NULL || pcxt->worker[i].bgwhandle == NULL) 852 continue; 853 854 status = WaitForBackgroundWorkerShutdown(pcxt->worker[i].bgwhandle); 855 856 /* 857 * If the postmaster kicked the bucket, we have no chance of cleaning 858 * up safely -- we won't be able to tell when our workers are actually 859 * dead. This doesn't necessitate a PANIC since they will all abort 860 * eventually, but we can't safely continue this session. 861 */ 862 if (status == BGWH_POSTMASTER_DIED) 863 ereport(FATAL, 864 (errcode(ERRCODE_ADMIN_SHUTDOWN), 865 errmsg("postmaster exited during a parallel transaction"))); 866 867 /* Release memory. */ 868 pfree(pcxt->worker[i].bgwhandle); 869 pcxt->worker[i].bgwhandle = NULL; 870 } 871 } 872 873 /* 874 * Destroy a parallel context. 875 * 876 * If expecting a clean exit, you should use WaitForParallelWorkersToFinish() 877 * first, before calling this function. When this function is invoked, any 878 * remaining workers are forcibly killed; the dynamic shared memory segment 879 * is unmapped; and we then wait (uninterruptibly) for the workers to exit. 880 */ 881 void 882 DestroyParallelContext(ParallelContext *pcxt) 883 { 884 int i; 885 886 /* 887 * Be careful about order of operations here! We remove the parallel 888 * context from the list before we do anything else; otherwise, if an 889 * error occurs during a subsequent step, we might try to nuke it again 890 * from AtEOXact_Parallel or AtEOSubXact_Parallel. 891 */ 892 dlist_delete(&pcxt->node); 893 894 /* Kill each worker in turn, and forget their error queues. */ 895 if (pcxt->worker != NULL) 896 { 897 for (i = 0; i < pcxt->nworkers_launched; ++i) 898 { 899 if (pcxt->worker[i].error_mqh != NULL) 900 { 901 TerminateBackgroundWorker(pcxt->worker[i].bgwhandle); 902 903 shm_mq_detach(pcxt->worker[i].error_mqh); 904 pcxt->worker[i].error_mqh = NULL; 905 } 906 } 907 } 908 909 /* 910 * If we have allocated a shared memory segment, detach it. This will 911 * implicitly detach the error queues, and any other shared memory queues, 912 * stored there. 913 */ 914 if (pcxt->seg != NULL) 915 { 916 dsm_detach(pcxt->seg); 917 pcxt->seg = NULL; 918 } 919 920 /* 921 * If this parallel context is actually in backend-private memory rather 922 * than shared memory, free that memory instead. 923 */ 924 if (pcxt->private_memory != NULL) 925 { 926 pfree(pcxt->private_memory); 927 pcxt->private_memory = NULL; 928 } 929 930 /* 931 * We can't finish transaction commit or abort until all of the workers 932 * have exited. This means, in particular, that we can't respond to 933 * interrupts at this stage. 934 */ 935 HOLD_INTERRUPTS(); 936 WaitForParallelWorkersToExit(pcxt); 937 RESUME_INTERRUPTS(); 938 939 /* Free the worker array itself. */ 940 if (pcxt->worker != NULL) 941 { 942 pfree(pcxt->worker); 943 pcxt->worker = NULL; 944 } 945 946 /* Free memory. */ 947 pfree(pcxt->library_name); 948 pfree(pcxt->function_name); 949 pfree(pcxt); 950 } 951 952 /* 953 * Are there any parallel contexts currently active? 954 */ 955 bool 956 ParallelContextActive(void) 957 { 958 return !dlist_is_empty(&pcxt_list); 959 } 960 961 /* 962 * Handle receipt of an interrupt indicating a parallel worker message. 963 * 964 * Note: this is called within a signal handler! All we can do is set 965 * a flag that will cause the next CHECK_FOR_INTERRUPTS() to invoke 966 * HandleParallelMessages(). 967 */ 968 void 969 HandleParallelMessageInterrupt(void) 970 { 971 InterruptPending = true; 972 ParallelMessagePending = true; 973 SetLatch(MyLatch); 974 } 975 976 /* 977 * Handle any queued protocol messages received from parallel workers. 978 */ 979 void 980 HandleParallelMessages(void) 981 { 982 dlist_iter iter; 983 MemoryContext oldcontext; 984 985 static MemoryContext hpm_context = NULL; 986 987 /* 988 * This is invoked from ProcessInterrupts(), and since some of the 989 * functions it calls contain CHECK_FOR_INTERRUPTS(), there is a potential 990 * for recursive calls if more signals are received while this runs. It's 991 * unclear that recursive entry would be safe, and it doesn't seem useful 992 * even if it is safe, so let's block interrupts until done. 993 */ 994 HOLD_INTERRUPTS(); 995 996 /* 997 * Moreover, CurrentMemoryContext might be pointing almost anywhere. We 998 * don't want to risk leaking data into long-lived contexts, so let's do 999 * our work here in a private context that we can reset on each use. 1000 */ 1001 if (hpm_context == NULL) /* first time through? */ 1002 hpm_context = AllocSetContextCreate(TopMemoryContext, 1003 "HandleParallelMessages", 1004 ALLOCSET_DEFAULT_SIZES); 1005 else 1006 MemoryContextReset(hpm_context); 1007 1008 oldcontext = MemoryContextSwitchTo(hpm_context); 1009 1010 /* OK to process messages. Reset the flag saying there are more to do. */ 1011 ParallelMessagePending = false; 1012 1013 dlist_foreach(iter, &pcxt_list) 1014 { 1015 ParallelContext *pcxt; 1016 int i; 1017 1018 pcxt = dlist_container(ParallelContext, node, iter.cur); 1019 if (pcxt->worker == NULL) 1020 continue; 1021 1022 for (i = 0; i < pcxt->nworkers_launched; ++i) 1023 { 1024 /* 1025 * Read as many messages as we can from each worker, but stop when 1026 * either (1) the worker's error queue goes away, which can happen 1027 * if we receive a Terminate message from the worker; or (2) no 1028 * more messages can be read from the worker without blocking. 1029 */ 1030 while (pcxt->worker[i].error_mqh != NULL) 1031 { 1032 shm_mq_result res; 1033 Size nbytes; 1034 void *data; 1035 1036 res = shm_mq_receive(pcxt->worker[i].error_mqh, &nbytes, 1037 &data, true); 1038 if (res == SHM_MQ_WOULD_BLOCK) 1039 break; 1040 else if (res == SHM_MQ_SUCCESS) 1041 { 1042 StringInfoData msg; 1043 1044 initStringInfo(&msg); 1045 appendBinaryStringInfo(&msg, data, nbytes); 1046 HandleParallelMessage(pcxt, i, &msg); 1047 pfree(msg.data); 1048 } 1049 else 1050 ereport(ERROR, 1051 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 1052 errmsg("lost connection to parallel worker"))); 1053 } 1054 } 1055 } 1056 1057 MemoryContextSwitchTo(oldcontext); 1058 1059 /* Might as well clear the context on our way out */ 1060 MemoryContextReset(hpm_context); 1061 1062 RESUME_INTERRUPTS(); 1063 } 1064 1065 /* 1066 * Handle a single protocol message received from a single parallel worker. 1067 */ 1068 static void 1069 HandleParallelMessage(ParallelContext *pcxt, int i, StringInfo msg) 1070 { 1071 char msgtype; 1072 1073 if (pcxt->known_attached_workers != NULL && 1074 !pcxt->known_attached_workers[i]) 1075 { 1076 pcxt->known_attached_workers[i] = true; 1077 pcxt->nknown_attached_workers++; 1078 } 1079 1080 msgtype = pq_getmsgbyte(msg); 1081 1082 switch (msgtype) 1083 { 1084 case 'K': /* BackendKeyData */ 1085 { 1086 int32 pid = pq_getmsgint(msg, 4); 1087 1088 (void) pq_getmsgint(msg, 4); /* discard cancel key */ 1089 (void) pq_getmsgend(msg); 1090 pcxt->worker[i].pid = pid; 1091 break; 1092 } 1093 1094 case 'E': /* ErrorResponse */ 1095 case 'N': /* NoticeResponse */ 1096 { 1097 ErrorData edata; 1098 ErrorContextCallback *save_error_context_stack; 1099 1100 /* Parse ErrorResponse or NoticeResponse. */ 1101 pq_parse_errornotice(msg, &edata); 1102 1103 /* Death of a worker isn't enough justification for suicide. */ 1104 edata.elevel = Min(edata.elevel, ERROR); 1105 1106 /* 1107 * If desired, add a context line to show that this is a 1108 * message propagated from a parallel worker. Otherwise, it 1109 * can sometimes be confusing to understand what actually 1110 * happened. (We don't do this in FORCE_PARALLEL_REGRESS mode 1111 * because it causes test-result instability depending on 1112 * whether a parallel worker is actually used or not.) 1113 */ 1114 if (force_parallel_mode != FORCE_PARALLEL_REGRESS) 1115 { 1116 if (edata.context) 1117 edata.context = psprintf("%s\n%s", edata.context, 1118 _("parallel worker")); 1119 else 1120 edata.context = pstrdup(_("parallel worker")); 1121 } 1122 1123 /* 1124 * Context beyond that should use the error context callbacks 1125 * that were in effect when the ParallelContext was created, 1126 * not the current ones. 1127 */ 1128 save_error_context_stack = error_context_stack; 1129 error_context_stack = pcxt->error_context_stack; 1130 1131 /* Rethrow error or print notice. */ 1132 ThrowErrorData(&edata); 1133 1134 /* Not an error, so restore previous context stack. */ 1135 error_context_stack = save_error_context_stack; 1136 1137 break; 1138 } 1139 1140 case 'A': /* NotifyResponse */ 1141 { 1142 /* Propagate NotifyResponse. */ 1143 int32 pid; 1144 const char *channel; 1145 const char *payload; 1146 1147 pid = pq_getmsgint(msg, 4); 1148 channel = pq_getmsgrawstring(msg); 1149 payload = pq_getmsgrawstring(msg); 1150 pq_endmessage(msg); 1151 1152 NotifyMyFrontEnd(channel, payload, pid); 1153 1154 break; 1155 } 1156 1157 case 'X': /* Terminate, indicating clean exit */ 1158 { 1159 shm_mq_detach(pcxt->worker[i].error_mqh); 1160 pcxt->worker[i].error_mqh = NULL; 1161 break; 1162 } 1163 1164 default: 1165 { 1166 elog(ERROR, "unrecognized message type received from parallel worker: %c (message length %d bytes)", 1167 msgtype, msg->len); 1168 } 1169 } 1170 } 1171 1172 /* 1173 * End-of-subtransaction cleanup for parallel contexts. 1174 * 1175 * Currently, it's forbidden to enter or leave a subtransaction while 1176 * parallel mode is in effect, so we could just blow away everything. But 1177 * we may want to relax that restriction in the future, so this code 1178 * contemplates that there may be multiple subtransaction IDs in pcxt_list. 1179 */ 1180 void 1181 AtEOSubXact_Parallel(bool isCommit, SubTransactionId mySubId) 1182 { 1183 while (!dlist_is_empty(&pcxt_list)) 1184 { 1185 ParallelContext *pcxt; 1186 1187 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); 1188 if (pcxt->subid != mySubId) 1189 break; 1190 if (isCommit) 1191 elog(WARNING, "leaked parallel context"); 1192 DestroyParallelContext(pcxt); 1193 } 1194 } 1195 1196 /* 1197 * End-of-transaction cleanup for parallel contexts. 1198 */ 1199 void 1200 AtEOXact_Parallel(bool isCommit) 1201 { 1202 while (!dlist_is_empty(&pcxt_list)) 1203 { 1204 ParallelContext *pcxt; 1205 1206 pcxt = dlist_head_element(ParallelContext, node, &pcxt_list); 1207 if (isCommit) 1208 elog(WARNING, "leaked parallel context"); 1209 DestroyParallelContext(pcxt); 1210 } 1211 } 1212 1213 /* 1214 * Main entrypoint for parallel workers. 1215 */ 1216 void 1217 ParallelWorkerMain(Datum main_arg) 1218 { 1219 dsm_segment *seg; 1220 shm_toc *toc; 1221 FixedParallelState *fps; 1222 char *error_queue_space; 1223 shm_mq *mq; 1224 shm_mq_handle *mqh; 1225 char *libraryspace; 1226 char *entrypointstate; 1227 char *library_name; 1228 char *function_name; 1229 parallel_worker_main_type entrypt; 1230 char *gucspace; 1231 char *combocidspace; 1232 char *tsnapspace; 1233 char *asnapspace; 1234 char *tstatespace; 1235 char *reindexspace; 1236 char *relmapperspace; 1237 char *enumblacklistspace; 1238 StringInfoData msgbuf; 1239 char *session_dsm_handle_space; 1240 Snapshot tsnapshot; 1241 Snapshot asnapshot; 1242 1243 /* Set flag to indicate that we're initializing a parallel worker. */ 1244 InitializingParallelWorker = true; 1245 1246 /* Establish signal handlers. */ 1247 pqsignal(SIGTERM, die); 1248 BackgroundWorkerUnblockSignals(); 1249 1250 /* Determine and set our parallel worker number. */ 1251 Assert(ParallelWorkerNumber == -1); 1252 memcpy(&ParallelWorkerNumber, MyBgworkerEntry->bgw_extra, sizeof(int)); 1253 1254 /* Set up a memory context to work in, just for cleanliness. */ 1255 CurrentMemoryContext = AllocSetContextCreate(TopMemoryContext, 1256 "Parallel worker", 1257 ALLOCSET_DEFAULT_SIZES); 1258 1259 /* 1260 * Attach to the dynamic shared memory segment for the parallel query, and 1261 * find its table of contents. 1262 * 1263 * Note: at this point, we have not created any ResourceOwner in this 1264 * process. This will result in our DSM mapping surviving until process 1265 * exit, which is fine. If there were a ResourceOwner, it would acquire 1266 * ownership of the mapping, but we have no need for that. 1267 */ 1268 seg = dsm_attach(DatumGetUInt32(main_arg)); 1269 if (seg == NULL) 1270 ereport(ERROR, 1271 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 1272 errmsg("could not map dynamic shared memory segment"))); 1273 toc = shm_toc_attach(PARALLEL_MAGIC, dsm_segment_address(seg)); 1274 if (toc == NULL) 1275 ereport(ERROR, 1276 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE), 1277 errmsg("invalid magic number in dynamic shared memory segment"))); 1278 1279 /* Look up fixed parallel state. */ 1280 fps = shm_toc_lookup(toc, PARALLEL_KEY_FIXED, false); 1281 MyFixedParallelState = fps; 1282 1283 /* Arrange to signal the leader if we exit. */ 1284 ParallelMasterPid = fps->parallel_master_pid; 1285 ParallelMasterBackendId = fps->parallel_master_backend_id; 1286 on_shmem_exit(ParallelWorkerShutdown, (Datum) 0); 1287 1288 /* 1289 * Now we can find and attach to the error queue provided for us. That's 1290 * good, because until we do that, any errors that happen here will not be 1291 * reported back to the process that requested that this worker be 1292 * launched. 1293 */ 1294 error_queue_space = shm_toc_lookup(toc, PARALLEL_KEY_ERROR_QUEUE, false); 1295 mq = (shm_mq *) (error_queue_space + 1296 ParallelWorkerNumber * PARALLEL_ERROR_QUEUE_SIZE); 1297 shm_mq_set_sender(mq, MyProc); 1298 mqh = shm_mq_attach(mq, seg, NULL); 1299 pq_redirect_to_shm_mq(seg, mqh); 1300 pq_set_parallel_master(fps->parallel_master_pid, 1301 fps->parallel_master_backend_id); 1302 1303 /* 1304 * Send a BackendKeyData message to the process that initiated parallelism 1305 * so that it has access to our PID before it receives any other messages 1306 * from us. Our cancel key is sent, too, since that's the way the 1307 * protocol message is defined, but it won't actually be used for anything 1308 * in this case. 1309 */ 1310 pq_beginmessage(&msgbuf, 'K'); 1311 pq_sendint32(&msgbuf, (int32) MyProcPid); 1312 pq_sendint32(&msgbuf, (int32) MyCancelKey); 1313 pq_endmessage(&msgbuf); 1314 1315 /* 1316 * Hooray! Primary initialization is complete. Now, we need to set up our 1317 * backend-local state to match the original backend. 1318 */ 1319 1320 /* 1321 * Join locking group. We must do this before anything that could try to 1322 * acquire a heavyweight lock, because any heavyweight locks acquired to 1323 * this point could block either directly against the parallel group 1324 * leader or against some process which in turn waits for a lock that 1325 * conflicts with the parallel group leader, causing an undetected 1326 * deadlock. (If we can't join the lock group, the leader has gone away, 1327 * so just exit quietly.) 1328 */ 1329 if (!BecomeLockGroupMember(fps->parallel_master_pgproc, 1330 fps->parallel_master_pid)) 1331 return; 1332 1333 /* 1334 * Restore transaction and statement start-time timestamps. This must 1335 * happen before anything that would start a transaction, else asserts in 1336 * xact.c will fire. 1337 */ 1338 SetParallelStartTimestamps(fps->xact_ts, fps->stmt_ts); 1339 1340 /* 1341 * Identify the entry point to be called. In theory this could result in 1342 * loading an additional library, though most likely the entry point is in 1343 * the core backend or in a library we just loaded. 1344 */ 1345 entrypointstate = shm_toc_lookup(toc, PARALLEL_KEY_ENTRYPOINT, false); 1346 library_name = entrypointstate; 1347 function_name = entrypointstate + strlen(library_name) + 1; 1348 1349 entrypt = LookupParallelWorkerFunction(library_name, function_name); 1350 1351 /* Restore database connection. */ 1352 BackgroundWorkerInitializeConnectionByOid(fps->database_id, 1353 fps->authenticated_user_id, 1354 0); 1355 1356 /* 1357 * Set the client encoding to the database encoding, since that is what 1358 * the leader will expect. 1359 */ 1360 SetClientEncoding(GetDatabaseEncoding()); 1361 1362 /* 1363 * Load libraries that were loaded by original backend. We want to do 1364 * this before restoring GUCs, because the libraries might define custom 1365 * variables. 1366 */ 1367 libraryspace = shm_toc_lookup(toc, PARALLEL_KEY_LIBRARY, false); 1368 StartTransactionCommand(); 1369 RestoreLibraryState(libraryspace); 1370 1371 /* Restore GUC values from launching backend. */ 1372 gucspace = shm_toc_lookup(toc, PARALLEL_KEY_GUC, false); 1373 RestoreGUCState(gucspace); 1374 CommitTransactionCommand(); 1375 1376 /* Crank up a transaction state appropriate to a parallel worker. */ 1377 tstatespace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_STATE, false); 1378 StartParallelWorkerTransaction(tstatespace); 1379 1380 /* Restore combo CID state. */ 1381 combocidspace = shm_toc_lookup(toc, PARALLEL_KEY_COMBO_CID, false); 1382 RestoreComboCIDState(combocidspace); 1383 1384 /* Attach to the per-session DSM segment and contained objects. */ 1385 session_dsm_handle_space = 1386 shm_toc_lookup(toc, PARALLEL_KEY_SESSION_DSM, false); 1387 AttachSession(*(dsm_handle *) session_dsm_handle_space); 1388 1389 /* 1390 * If the transaction isolation level is REPEATABLE READ or SERIALIZABLE, 1391 * the leader has serialized the transaction snapshot and we must restore 1392 * it. At lower isolation levels, there is no transaction-lifetime 1393 * snapshot, but we need TransactionXmin to get set to a value which is 1394 * less than or equal to the xmin of every snapshot that will be used by 1395 * this worker. The easiest way to accomplish that is to install the 1396 * active snapshot as the transaction snapshot. Code running in this 1397 * parallel worker might take new snapshots via GetTransactionSnapshot() 1398 * or GetLatestSnapshot(), but it shouldn't have any way of acquiring a 1399 * snapshot older than the active snapshot. 1400 */ 1401 asnapspace = shm_toc_lookup(toc, PARALLEL_KEY_ACTIVE_SNAPSHOT, false); 1402 tsnapspace = shm_toc_lookup(toc, PARALLEL_KEY_TRANSACTION_SNAPSHOT, true); 1403 asnapshot = RestoreSnapshot(asnapspace); 1404 tsnapshot = tsnapspace ? RestoreSnapshot(tsnapspace) : asnapshot; 1405 RestoreTransactionSnapshot(tsnapshot, 1406 fps->parallel_master_pgproc); 1407 PushActiveSnapshot(asnapshot); 1408 1409 /* 1410 * We've changed which tuples we can see, and must therefore invalidate 1411 * system caches. 1412 */ 1413 InvalidateSystemCaches(); 1414 1415 /* 1416 * Restore current role id. Skip verifying whether session user is 1417 * allowed to become this role and blindly restore the leader's state for 1418 * current role. 1419 */ 1420 SetCurrentRoleId(fps->outer_user_id, fps->is_superuser); 1421 1422 /* Restore user ID and security context. */ 1423 SetUserIdAndSecContext(fps->current_user_id, fps->sec_context); 1424 1425 /* Restore temp-namespace state to ensure search path matches leader's. */ 1426 SetTempNamespaceState(fps->temp_namespace_id, 1427 fps->temp_toast_namespace_id); 1428 1429 /* Restore reindex state. */ 1430 reindexspace = shm_toc_lookup(toc, PARALLEL_KEY_REINDEX_STATE, false); 1431 RestoreReindexState(reindexspace); 1432 1433 /* Restore relmapper state. */ 1434 relmapperspace = shm_toc_lookup(toc, PARALLEL_KEY_RELMAPPER_STATE, false); 1435 RestoreRelationMap(relmapperspace); 1436 1437 /* Restore enum blacklist. */ 1438 enumblacklistspace = shm_toc_lookup(toc, PARALLEL_KEY_ENUMBLACKLIST, 1439 false); 1440 RestoreEnumBlacklist(enumblacklistspace); 1441 1442 /* Attach to the leader's serializable transaction, if SERIALIZABLE. */ 1443 AttachSerializableXact(fps->serializable_xact_handle); 1444 1445 /* 1446 * We've initialized all of our state now; nothing should change 1447 * hereafter. 1448 */ 1449 InitializingParallelWorker = false; 1450 EnterParallelMode(); 1451 1452 /* 1453 * Time to do the real work: invoke the caller-supplied code. 1454 */ 1455 entrypt(seg, toc); 1456 1457 /* Must exit parallel mode to pop active snapshot. */ 1458 ExitParallelMode(); 1459 1460 /* Must pop active snapshot so snapmgr.c doesn't complain. */ 1461 PopActiveSnapshot(); 1462 1463 /* Shut down the parallel-worker transaction. */ 1464 EndParallelWorkerTransaction(); 1465 1466 /* Detach from the per-session DSM segment. */ 1467 DetachSession(); 1468 1469 /* Report success. */ 1470 pq_putmessage('X', NULL, 0); 1471 } 1472 1473 /* 1474 * Update shared memory with the ending location of the last WAL record we 1475 * wrote, if it's greater than the value already stored there. 1476 */ 1477 void 1478 ParallelWorkerReportLastRecEnd(XLogRecPtr last_xlog_end) 1479 { 1480 FixedParallelState *fps = MyFixedParallelState; 1481 1482 Assert(fps != NULL); 1483 SpinLockAcquire(&fps->mutex); 1484 if (fps->last_xlog_end < last_xlog_end) 1485 fps->last_xlog_end = last_xlog_end; 1486 SpinLockRelease(&fps->mutex); 1487 } 1488 1489 /* 1490 * Make sure the leader tries to read from our error queue one more time. 1491 * This guards against the case where we exit uncleanly without sending an 1492 * ErrorResponse to the leader, for example because some code calls proc_exit 1493 * directly. 1494 */ 1495 static void 1496 ParallelWorkerShutdown(int code, Datum arg) 1497 { 1498 SendProcSignal(ParallelMasterPid, 1499 PROCSIG_PARALLEL_MESSAGE, 1500 ParallelMasterBackendId); 1501 } 1502 1503 /* 1504 * Look up (and possibly load) a parallel worker entry point function. 1505 * 1506 * For functions contained in the core code, we use library name "postgres" 1507 * and consult the InternalParallelWorkers array. External functions are 1508 * looked up, and loaded if necessary, using load_external_function(). 1509 * 1510 * The point of this is to pass function names as strings across process 1511 * boundaries. We can't pass actual function addresses because of the 1512 * possibility that the function has been loaded at a different address 1513 * in a different process. This is obviously a hazard for functions in 1514 * loadable libraries, but it can happen even for functions in the core code 1515 * on platforms using EXEC_BACKEND (e.g., Windows). 1516 * 1517 * At some point it might be worthwhile to get rid of InternalParallelWorkers[] 1518 * in favor of applying load_external_function() for core functions too; 1519 * but that raises portability issues that are not worth addressing now. 1520 */ 1521 static parallel_worker_main_type 1522 LookupParallelWorkerFunction(const char *libraryname, const char *funcname) 1523 { 1524 /* 1525 * If the function is to be loaded from postgres itself, search the 1526 * InternalParallelWorkers array. 1527 */ 1528 if (strcmp(libraryname, "postgres") == 0) 1529 { 1530 int i; 1531 1532 for (i = 0; i < lengthof(InternalParallelWorkers); i++) 1533 { 1534 if (strcmp(InternalParallelWorkers[i].fn_name, funcname) == 0) 1535 return InternalParallelWorkers[i].fn_addr; 1536 } 1537 1538 /* We can only reach this by programming error. */ 1539 elog(ERROR, "internal function \"%s\" not found", funcname); 1540 } 1541 1542 /* Otherwise load from external library. */ 1543 return (parallel_worker_main_type) 1544 load_external_function(libraryname, funcname, true, NULL); 1545 } 1546