1 /*-------------------------------------------------------------------------- 2 * 3 * setup.c 4 * Code to set up a dynamic shared memory segments and a specified 5 * number of background workers for shared memory message queue 6 * testing. 7 * 8 * Copyright (c) 2013-2019, PostgreSQL Global Development Group 9 * 10 * IDENTIFICATION 11 * src/test/modules/test_shm_mq/setup.c 12 * 13 * ------------------------------------------------------------------------- 14 */ 15 16 #include "postgres.h" 17 18 #include "miscadmin.h" 19 #include "pgstat.h" 20 #include "postmaster/bgworker.h" 21 #include "storage/procsignal.h" 22 #include "storage/shm_toc.h" 23 #include "utils/memutils.h" 24 25 #include "test_shm_mq.h" 26 27 typedef struct 28 { 29 int nworkers; 30 BackgroundWorkerHandle *handle[FLEXIBLE_ARRAY_MEMBER]; 31 } worker_state; 32 33 static void setup_dynamic_shared_memory(int64 queue_size, int nworkers, 34 dsm_segment **segp, 35 test_shm_mq_header **hdrp, 36 shm_mq **outp, shm_mq **inp); 37 static worker_state *setup_background_workers(int nworkers, 38 dsm_segment *seg); 39 static void cleanup_background_workers(dsm_segment *seg, Datum arg); 40 static void wait_for_workers_to_become_ready(worker_state *wstate, 41 volatile test_shm_mq_header *hdr); 42 static bool check_worker_status(worker_state *wstate); 43 44 /* 45 * Set up a dynamic shared memory segment and zero or more background workers 46 * for a test run. 47 */ 48 void 49 test_shm_mq_setup(int64 queue_size, int32 nworkers, dsm_segment **segp, 50 shm_mq_handle **output, shm_mq_handle **input) 51 { 52 dsm_segment *seg; 53 test_shm_mq_header *hdr; 54 shm_mq *outq = NULL; /* placate compiler */ 55 shm_mq *inq = NULL; /* placate compiler */ 56 worker_state *wstate; 57 58 /* Set up a dynamic shared memory segment. */ 59 setup_dynamic_shared_memory(queue_size, nworkers, &seg, &hdr, &outq, &inq); 60 *segp = seg; 61 62 /* Register background workers. */ 63 wstate = setup_background_workers(nworkers, seg); 64 65 /* Attach the queues. */ 66 *output = shm_mq_attach(outq, seg, wstate->handle[0]); 67 *input = shm_mq_attach(inq, seg, wstate->handle[nworkers - 1]); 68 69 /* Wait for workers to become ready. */ 70 wait_for_workers_to_become_ready(wstate, hdr); 71 72 /* 73 * Once we reach this point, all workers are ready. We no longer need to 74 * kill them if we die; they'll die on their own as the message queues 75 * shut down. 76 */ 77 cancel_on_dsm_detach(seg, cleanup_background_workers, 78 PointerGetDatum(wstate)); 79 pfree(wstate); 80 } 81 82 /* 83 * Set up a dynamic shared memory segment. 84 * 85 * We set up a small control region that contains only a test_shm_mq_header, 86 * plus one region per message queue. There are as many message queues as 87 * the number of workers, plus one. 88 */ 89 static void 90 setup_dynamic_shared_memory(int64 queue_size, int nworkers, 91 dsm_segment **segp, test_shm_mq_header **hdrp, 92 shm_mq **outp, shm_mq **inp) 93 { 94 shm_toc_estimator e; 95 int i; 96 Size segsize; 97 dsm_segment *seg; 98 shm_toc *toc; 99 test_shm_mq_header *hdr; 100 101 /* Ensure a valid queue size. */ 102 if (queue_size < 0 || ((uint64) queue_size) < shm_mq_minimum_size) 103 ereport(ERROR, 104 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 105 errmsg("queue size must be at least %zu bytes", 106 shm_mq_minimum_size))); 107 if (queue_size != ((Size) queue_size)) 108 ereport(ERROR, 109 (errcode(ERRCODE_INVALID_PARAMETER_VALUE), 110 errmsg("queue size overflows size_t"))); 111 112 /* 113 * Estimate how much shared memory we need. 114 * 115 * Because the TOC machinery may choose to insert padding of oddly-sized 116 * requests, we must estimate each chunk separately. 117 * 118 * We need one key to register the location of the header, and we need 119 * nworkers + 1 keys to track the locations of the message queues. 120 */ 121 shm_toc_initialize_estimator(&e); 122 shm_toc_estimate_chunk(&e, sizeof(test_shm_mq_header)); 123 for (i = 0; i <= nworkers; ++i) 124 shm_toc_estimate_chunk(&e, (Size) queue_size); 125 shm_toc_estimate_keys(&e, 2 + nworkers); 126 segsize = shm_toc_estimate(&e); 127 128 /* Create the shared memory segment and establish a table of contents. */ 129 seg = dsm_create(shm_toc_estimate(&e), 0); 130 toc = shm_toc_create(PG_TEST_SHM_MQ_MAGIC, dsm_segment_address(seg), 131 segsize); 132 133 /* Set up the header region. */ 134 hdr = shm_toc_allocate(toc, sizeof(test_shm_mq_header)); 135 SpinLockInit(&hdr->mutex); 136 hdr->workers_total = nworkers; 137 hdr->workers_attached = 0; 138 hdr->workers_ready = 0; 139 shm_toc_insert(toc, 0, hdr); 140 141 /* Set up one message queue per worker, plus one. */ 142 for (i = 0; i <= nworkers; ++i) 143 { 144 shm_mq *mq; 145 146 mq = shm_mq_create(shm_toc_allocate(toc, (Size) queue_size), 147 (Size) queue_size); 148 shm_toc_insert(toc, i + 1, mq); 149 150 if (i == 0) 151 { 152 /* We send messages to the first queue. */ 153 shm_mq_set_sender(mq, MyProc); 154 *outp = mq; 155 } 156 if (i == nworkers) 157 { 158 /* We receive messages from the last queue. */ 159 shm_mq_set_receiver(mq, MyProc); 160 *inp = mq; 161 } 162 } 163 164 /* Return results to caller. */ 165 *segp = seg; 166 *hdrp = hdr; 167 } 168 169 /* 170 * Register background workers. 171 */ 172 static worker_state * 173 setup_background_workers(int nworkers, dsm_segment *seg) 174 { 175 MemoryContext oldcontext; 176 BackgroundWorker worker; 177 worker_state *wstate; 178 int i; 179 180 /* 181 * We need the worker_state object and the background worker handles to 182 * which it points to be allocated in CurTransactionContext rather than 183 * ExprContext; otherwise, they'll be destroyed before the on_dsm_detach 184 * hooks run. 185 */ 186 oldcontext = MemoryContextSwitchTo(CurTransactionContext); 187 188 /* Create worker state object. */ 189 wstate = MemoryContextAlloc(TopTransactionContext, 190 offsetof(worker_state, handle) + 191 sizeof(BackgroundWorkerHandle *) * nworkers); 192 wstate->nworkers = 0; 193 194 /* 195 * Arrange to kill all the workers if we abort before all workers are 196 * finished hooking themselves up to the dynamic shared memory segment. 197 * 198 * If we die after all the workers have finished hooking themselves up to 199 * the dynamic shared memory segment, we'll mark the two queues to which 200 * we're directly connected as detached, and the worker(s) connected to 201 * those queues will exit, marking any other queues to which they are 202 * connected as detached. This will cause any as-yet-unaware workers 203 * connected to those queues to exit in their turn, and so on, until 204 * everybody exits. 205 * 206 * But suppose the workers which are supposed to connect to the queues to 207 * which we're directly attached exit due to some error before they 208 * actually attach the queues. The remaining workers will have no way of 209 * knowing this. From their perspective, they're still waiting for those 210 * workers to start, when in fact they've already died. 211 */ 212 on_dsm_detach(seg, cleanup_background_workers, 213 PointerGetDatum(wstate)); 214 215 /* Configure a worker. */ 216 memset(&worker, 0, sizeof(worker)); 217 worker.bgw_flags = BGWORKER_SHMEM_ACCESS; 218 worker.bgw_start_time = BgWorkerStart_ConsistentState; 219 worker.bgw_restart_time = BGW_NEVER_RESTART; 220 sprintf(worker.bgw_library_name, "test_shm_mq"); 221 sprintf(worker.bgw_function_name, "test_shm_mq_main"); 222 snprintf(worker.bgw_type, BGW_MAXLEN, "test_shm_mq"); 223 worker.bgw_main_arg = UInt32GetDatum(dsm_segment_handle(seg)); 224 /* set bgw_notify_pid, so we can detect if the worker stops */ 225 worker.bgw_notify_pid = MyProcPid; 226 227 /* Register the workers. */ 228 for (i = 0; i < nworkers; ++i) 229 { 230 if (!RegisterDynamicBackgroundWorker(&worker, &wstate->handle[i])) 231 ereport(ERROR, 232 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 233 errmsg("could not register background process"), 234 errhint("You may need to increase max_worker_processes."))); 235 ++wstate->nworkers; 236 } 237 238 /* All done. */ 239 MemoryContextSwitchTo(oldcontext); 240 return wstate; 241 } 242 243 static void 244 cleanup_background_workers(dsm_segment *seg, Datum arg) 245 { 246 worker_state *wstate = (worker_state *) DatumGetPointer(arg); 247 248 while (wstate->nworkers > 0) 249 { 250 --wstate->nworkers; 251 TerminateBackgroundWorker(wstate->handle[wstate->nworkers]); 252 } 253 } 254 255 static void 256 wait_for_workers_to_become_ready(worker_state *wstate, 257 volatile test_shm_mq_header *hdr) 258 { 259 bool result = false; 260 261 for (;;) 262 { 263 int workers_ready; 264 265 /* If all the workers are ready, we have succeeded. */ 266 SpinLockAcquire(&hdr->mutex); 267 workers_ready = hdr->workers_ready; 268 SpinLockRelease(&hdr->mutex); 269 if (workers_ready >= wstate->nworkers) 270 { 271 result = true; 272 break; 273 } 274 275 /* If any workers (or the postmaster) have died, we have failed. */ 276 if (!check_worker_status(wstate)) 277 { 278 result = false; 279 break; 280 } 281 282 /* Wait to be signalled. */ 283 (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0, 284 PG_WAIT_EXTENSION); 285 286 /* Reset the latch so we don't spin. */ 287 ResetLatch(MyLatch); 288 289 /* An interrupt may have occurred while we were waiting. */ 290 CHECK_FOR_INTERRUPTS(); 291 } 292 293 if (!result) 294 ereport(ERROR, 295 (errcode(ERRCODE_INSUFFICIENT_RESOURCES), 296 errmsg("one or more background workers failed to start"))); 297 } 298 299 static bool 300 check_worker_status(worker_state *wstate) 301 { 302 int n; 303 304 /* If any workers (or the postmaster) have died, we have failed. */ 305 for (n = 0; n < wstate->nworkers; ++n) 306 { 307 BgwHandleStatus status; 308 pid_t pid; 309 310 status = GetBackgroundWorkerPid(wstate->handle[n], &pid); 311 if (status == BGWH_STOPPED || status == BGWH_POSTMASTER_DIED) 312 return false; 313 } 314 315 /* Otherwise, things still look OK. */ 316 return true; 317 } 318