1 /*-------------------------------------------------------------------------- 2 * 3 * setup.c 4 * Code to set up a dynamic shared memory segments and a specified 5 * number of background workers for shared memory message queue 6 * testing. 7 * 8 * Copyright (c) 2013-2020, PostgreSQL Global Development Group 9 * 10 * IDENTIFICATION 11 * src/test/modules/test_shm_mq/setup.c 12 * 13 * ------------------------------------------------------------------------- 14 */ 15 16 #include "postgres.h" 17 18 #include "miscadmin.h" 19 #include "pgstat.h" 20 #include "postmaster/bgworker.h" 21 #include "storage/procsignal.h" 22 #include "storage/shm_toc.h" 23 #include "test_shm_mq.h" 24 #include "utils/memutils.h" 25 26 typedef struct 27 { 28 int nworkers; 29 BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; 30 } worker_state; 31 32 static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, 33 dsm_segment **segp, 34 test_shm_mq_header **hdrp, 35 shm_mq **outp, shm_mq **inp); 36 static worker_state *setup_background_workers(int nworkers, 37 dsm_segment *seg); 38 static void cleanup_background_workers(dsm_segment *seg, Datum arg); 39 static void wait_for_workers_to_become_ready(worker_state *wstate, 40 volatile test_shm_mq_header *hdr); 41 static bool check_worker_status(worker_state *wstate); 42 43 /* 44 * Set up a dynamic shared memory segment and zero or more background workers 45 * for a test run. 46 */ 47 void 48 test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, 49 shm_mq_handle **output, shm_mq_handle **input) 50 { 51 dsm_segment *seg; 52 test_shm_mq_header *hdr; 53 shm_mq *outq = NULL; /* placate compiler */ 54 shm_mq *inq = NULL; /* placate compiler */ 55 worker_state *wstate; 56 57 /* Set up a dynamic shared memory segment. */ 58 setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); 59 *segp = seg; 60 61 /* Register background workers. */ 62 wstate = setup_background_workers(nworkers, seg); 63 64 /* Attach the queues. */ 65 *output = shm_mq_attach(outq, seg, wstate->handle[0]); 66 *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]); 67 68 /* Wait for workers to become ready. */ 69 wait_for_workers_to_become_ready(wstate, hdr); 70 71 /* 72 * Once we reach this point, all workers are ready. We no longer need to 73 * kill them if we die; they'll die on their own as the message queues 74 * shut down. 75 */ 76 cancel_on_dsm_detach(seg, cleanup_background_workers, 77 PointerGetDatum(wstate)); 78 pfree(wstate); 79 } 80 81 /* 82 * Set up a dynamic shared memory segment. 83 * 84 * We set up a small control region that contains only a test_shm_mq_header, 85 * plus one region per message queue. There are as many message queues as 86 * the number of workers, plus one. 87 */ 88 static void 89 setup_dynamic_shared_memory(int64 queue_size, int nworkers, 90 dsm_segment **segp, test_shm_mq_header **hdrp, 91 shm_mq **outp, shm_mq **inp) 92 { 93 shm_toc_estimator e; 94 int i; 95 Size segsize; 96 dsm_segment *seg; 97 shm_toc *toc; 98 test_shm_mq_header *hdr; 99 100 /* Ensure a valid queue size. */ 101 if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) 102 ereport(ERROR, 103 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 104 errmsg("queue size must be at least %zu bytes", 105 shm_mq_minimum_size))); 106 if (queue_size != ((Size) queue_size)) 107 ereport(ERROR, 108 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 109 errmsg("queue size overflows size_t"))); 110 111 /* 112 * Estimate how much shared memory we need. 113 * 114 * Because the TOC machinery may choose to insert padding of oddly-sized 115 * requests, we must estimate each chunk separately. 116 * 117 * We need one key to register the location of the header, and we need 118 * nworkers + 1 keys to track the locations of the message queues. 119 */ 120 shm_toc_initialize_estimator(&e); 121 shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header)); 122 for (i = 0; i <= nworkers; ++i) 123 shm_toc_estimate_chunk(&e, (Size) queue_size); 124 shm_toc_estimate_keys(&e, 2 + nworkers); 125 segsize = shm_toc_estimate(&e); 126 127 /* Create the shared memory segment and establish a table of contents. */ 128 seg = dsm_create(shm_toc_estimate(&e), 0); 129 toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg), 130 segsize); 131 132 /* Set up the header region. */ 133 hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header)); 134 SpinLockInit(&hdr->mutex); 135 hdr->workers_total = nworkers; 136 hdr->workers_attached = 0; 137 hdr->workers_ready = 0; 138 shm_toc_insert(toc, 0, hdr); 139 140 /* Set up one message queue per worker, plus one. */ 141 for (i = 0; i <= nworkers; ++i) 142 { 143 shm_mq *mq; 144 145 mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), 146 (Size) queue_size); 147 shm_toc_insert(toc, i + 1, mq); 148 149 if (i == 0) 150 { 151 /* We send messages to the first queue. */ 152 shm_mq_set_sender(mq, MyProc); 153 *outp = mq; 154 } 155 if (i == nworkers) 156 { 157 /* We receive messages from the last queue. */ 158 shm_mq_set_receiver(mq, MyProc); 159 *inp = mq; 160 } 161 } 162 163 /* Return results to caller. */ 164 *segp = seg; 165 *hdrp = hdr; 166 } 167 168 /* 169 * Register background workers. 170 */ 171 static worker_state * 172 setup_background_workers(int nworkers, dsm_segment *seg) 173 { 174 MemoryContext oldcontext; 175 BackgroundWorker worker; 176 worker_state *wstate; 177 int i; 178 179 /* 180 * We need the worker_state object and the background worker handles to 181 * which it points to be allocated in CurTransactionContext rather than 182 * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach 183 * hooks run. 184 */ 185 oldcontext = MemoryContextSwitchTo(CurTransactionContext); 186 187 /* Create worker state object. */ 188 wstate = MemoryContextAlloc(TopTransactionContext, 189 offsetof(worker_state, handle) + 190 sizeof(BackgroundWorkerHandle *) * nworkers); 191 wstate->nworkers = 0; 192 193 /* 194 * Arrange to kill all the workers if we abort before all workers are 195 * finished hooking themselves up to the dynamic shared memory segment. 196 * 197 * If we die after all the workers have finished hooking themselves up to 198 * the dynamic shared memory segment, we'll mark the two queues to which 199 * we're directly connected as detached, and the worker(s) connected to 200 * those queues will exit, marking any other queues to which they are 201 * connected as detached. This will cause any as-yet-unaware workers 202 * connected to those queues to exit in their turn, and so on, until 203 * everybody exits. 204 * 205 * But suppose the workers which are supposed to connect to the queues to 206 * which we're directly attached exit due to some error before they 207 * actually attach the queues. The remaining workers will have no way of 208 * knowing this. From their perspective, they're still waiting for those 209 * workers to start, when in fact they've already died. 210 */ 211 on_dsm_detach(seg, cleanup_background_workers, 212 PointerGetDatum(wstate)); 213 214 /* Configure a worker. */ 215 memset(&worker, 0, sizeof(worker)); 216 worker.bgw_flags = BGWORKER_SHMEM_ACCESS; 217 worker.bgw_start_time = BgWorkerStart_ConsistentState; 218 worker.bgw_restart_time = BGW_NEVER_RESTART; 219 sprintf(worker.bgw_library_name, "test_shm_mq"); 220 sprintf(worker.bgw_function_name, "test_shm_mq_main"); 221 snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq"); 222 worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); 223 /* set bgw_notify_pid, so we can detect if the worker stops */ 224 worker.bgw_notify_pid = MyProcPid; 225 226 /* Register the workers. */ 227 for (i = 0; i < nworkers; ++i) 228 { 229 if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) 230 ereport(ERROR, 231 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 232 errmsg("could not register background process"), 233 errhint("You may need to increase max_worker_processes."))); 234 ++wstate->nworkers; 235 } 236 237 /* All done. */ 238 MemoryContextSwitchTo(oldcontext); 239 return wstate; 240 } 241 242 static void 243 cleanup_background_workers(dsm_segment *seg, Datum arg) 244 { 245 worker_state *wstate = (worker_state *) DatumGetPointer(arg); 246 247 while (wstate->nworkers > 0) 248 { 249 --wstate->nworkers; 250 TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); 251 } 252 } 253 254 static void 255 wait_for_workers_to_become_ready(worker_state *wstate, 256 volatile test_shm_mq_header *hdr) 257 { 258 bool result = false; 259 260 for (;;) 261 { 262 int workers_ready; 263 264 /* If all the workers are ready, we have succeeded. */ 265 SpinLockAcquire(&hdr->mutex); 266 workers_ready = hdr->workers_ready; 267 SpinLockRelease(&hdr->mutex); 268 if (workers_ready >= wstate->nworkers) 269 { 270 result = true; 271 break; 272 } 273 274 /* If any workers (or the postmaster) have died, we have failed. */ 275 if (!check_worker_status(wstate)) 276 { 277 result = false; 278 break; 279 } 280 281 /* Wait to be signaled. */ 282 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, 283 PG_WAIT_EXTENSION); 284 285 /* Reset the latch so we don't spin. */ 286 ResetLatch(MyLatch); 287 288 /* An interrupt may have occurred while we were waiting. */ 289 CHECK_FOR_INTERRUPTS(); 290 } 291 292 if (!result) 293 ereport(ERROR, 294 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 295 errmsg("one or more background workers failed to start"))); 296 } 297 298 static bool 299 check_worker_status(worker_state *wstate) 300 { 301 int n; 302 303 /* If any workers (or the postmaster) have died, we have failed. */ 304 for (n = 0; n < wstate->nworkers; ++n) 305 { 306 BgwHandleStatus status; 307 pid_t pid; 308 309 status = GetBackgroundWorkerPid(wstate->handle[n], &pid); 310 if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED) 311 return false; 312 } 313 314 /* Otherwise, things still look OK. */ 315 return true; 316 } 317