1 /*------------------------------------------------------------------------- 2 * 3 * barrier.c 4 * Barriers for synchronizing cooperating processes. 5 * 6 * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group 7 * Portions Copyright (c) 1994, Regents of the University of California 8 * 9 * From Wikipedia[1]: "In parallel computing, a barrier is a type of 10 * synchronization method. A barrier for a group of threads or processes in 11 * the source code means any thread/process must stop at this point and cannot 12 * proceed until all other threads/processes reach this barrier." 13 * 14 * This implementation of barriers allows for static sets of participants 15 * known up front, or dynamic sets of participants which processes can join or 16 * leave at any time. In the dynamic case, a phase number can be used to 17 * track progress through a parallel algorithm, and may be necessary to 18 * synchronize with the current phase of a multi-phase algorithm when a new 19 * participant joins. In the static case, the phase number is used 20 * internally, but it isn't strictly necessary for client code to access it 21 * because the phase can only advance when the declared number of participants 22 * reaches the barrier, so client code should be in no doubt about the current 23 * phase of computation at all times. 24 * 25 * Consider a parallel algorithm that involves separate phases of computation 26 * A, B and C where the output of each phase is needed before the next phase 27 * can begin. 28 * 29 * In the case of a static barrier initialized with 4 participants, each 30 * participant works on phase A, then calls BarrierArriveAndWait to wait until 31 * all 4 participants have reached that point. When BarrierArriveAndWait 32 * returns control, each participant can work on B, and so on. Because the 33 * barrier knows how many participants to expect, the phases of computation 34 * don't need labels or numbers, since each process's program counter implies 35 * the current phase. Even if some of the processes are slow to start up and 36 * begin running phase A, the other participants are expecting them and will 37 * patiently wait at the barrier. The code could be written as follows: GetFreeIndexPage(Relation rel)38 * 39 * perform_a(); 40 * BarrierArriveAndWait(&barrier, ...); 41 * perform_b(); 42 * BarrierArriveAndWait(&barrier, ...); 43 * perform_c(); 44 * BarrierArriveAndWait(&barrier, ...); 45 * 46 * If the number of participants is not known up front, then a dynamic barrier 47 * is needed and the number should be set to zero at initialization. New 48 * complications arise because the number necessarily changes over time as 49 * participants attach and detach, and therefore phases B, C or even the end 50 * of processing may be reached before any given participant has started 51 * running and attached. Therefore the client code must perform an initial 52 * test of the phase number after attaching, because it needs to find out 53 * which phase of the algorithm has been reached by any participants that are 54 * already attached in order to synchronize with that work. Once the program 55 * counter or some other representation of current progress is synchronized 56 * with the barrier's phase, normal control flow can be used just as in the 57 * static case. Our example could be written using a switch statement with 58 * cases that fall-through, as follows: 59 * 60 * phase = BarrierAttach(&barrier); 61 * switch (phase) 62 * { 63 * case PHASE_A: 64 * perform_a(); 65 * BarrierArriveAndWait(&barrier, ...); 66 * case PHASE_B: 67 * perform_b(); 68 * BarrierArriveAndWait(&barrier, ...); 69 * case PHASE_C: 70 * perform_c(); 71 * BarrierArriveAndWait(&barrier, ...); 72 * } 73 * BarrierDetach(&barrier); 74 * 75 * Static barriers behave similarly to POSIX's pthread_barrier_t. Dynamic 76 * barriers behave similarly to Java's java.util.concurrent.Phaser. 77 * 78 * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science) 79 * 80 * IDENTIFICATION 81 * src/backend/storage/ipc/barrier.c 82 * 83 *------------------------------------------------------------------------- 84 */ 85 86 #include "postgres.h" 87 #include "storage/barrier.h" 88 89 static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive); 90 91 /* 92 * Initialize this barrier. To use a static party size, provide the number of 93 * participants to wait for at each phase indicating that that number of 94 * backends is implicitly attached. To use a dynamic party size, specify zero 95 * here and then use BarrierAttach() and 96 * BarrierDetach()/BarrierArriveAndDetach() to register and deregister 97 * participants explicitly. 98 */ 99 void 100 BarrierInit(Barrier *barrier, int participants) 101 { 102 SpinLockInit(&barrier->mutex); 103 barrier->participants = participants; 104 barrier->arrived = 0; 105 barrier->phase = 0; 106 barrier->elected = 0; 107 barrier->static_party = participants > 0; 108 ConditionVariableInit(&barrier->condition_variable); 109 } 110 111 /* 112 * Arrive at this barrier, wait for all other attached participants to arrive 113 * too and then return. Increments the current phase. The caller must be 114 * attached. 115 * 116 * While waiting, pg_stat_activity shows a wait_event_type and wait_event 117 * controlled by the wait_event_info passed in, which should be a value from 118 * one of the WaitEventXXX enums defined in pgstat.h. 119 * 120 * Return true in one arbitrarily chosen participant. Return false in all 121 * others. The return code can be used to elect one participant to execute a 122 * phase of work that must be done serially while other participants wait. 123 */ 124 bool 125 BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info) 126 { 127 bool release = false; 128 bool elected; 129 int start_phase; 130 int next_phase; 131 132 SpinLockAcquire(&barrier->mutex); 133 start_phase = barrier->phase; 134 next_phase = start_phase + 1; 135 ++barrier->arrived; 136 if (barrier->arrived == barrier->participants) 137 { 138 release = true; 139 barrier->arrived = 0; 140 barrier->phase = next_phase; 141 barrier->elected = next_phase; 142 } 143 SpinLockRelease(&barrier->mutex); 144 145 /* 146 * If we were the last expected participant to arrive, we can release our 147 * peers and return true to indicate that this backend has been elected to 148 * perform any serial work. 149 */ 150 if (release) 151 { 152 ConditionVariableBroadcast(&barrier->condition_variable); 153 154 return true; 155 } 156 157 /* 158 * Otherwise we have to wait for the last participant to arrive and 159 * advance the phase. 160 */ 161 elected = false; 162 ConditionVariablePrepareToSleep(&barrier->condition_variable); 163 for (;;) 164 { 165 /* 166 * We know that phase must either be start_phase, indicating that we 167 * need to keep waiting, or next_phase, indicating that the last 168 * participant that we were waiting for has either arrived or detached 169 * so that the next phase has begun. The phase cannot advance any 170 * further than that without this backend's participation, because 171 * this backend is attached. 172 */ 173 SpinLockAcquire(&barrier->mutex); 174 Assert(barrier->phase == start_phase || barrier->phase == next_phase); 175 release = barrier->phase == next_phase; 176 if (release && barrier->elected != next_phase) 177 { 178 /* 179 * Usually the backend that arrives last and releases the other 180 * backends is elected to return true (see above), so that it can 181 * begin processing serial work while it has a CPU timeslice. 182 * However, if the barrier advanced because someone detached, then 183 * one of the backends that is awoken will need to be elected. 184 */ 185 barrier->elected = barrier->phase; 186 elected = true; 187 } 188 SpinLockRelease(&barrier->mutex); 189 if (release) 190 break; 191 ConditionVariableSleep(&barrier->condition_variable, wait_event_info); 192 } 193 ConditionVariableCancelSleep(); 194 195 return elected; 196 } 197 198 /* 199 * Arrive at this barrier, but detach rather than waiting. Returns true if 200 * the caller was the last to detach. 201 */ 202 bool 203 BarrierArriveAndDetach(Barrier *barrier) 204 { 205 return BarrierDetachImpl(barrier, true); 206 } 207 208 /* 209 * Arrive at a barrier, and detach all but the last to arrive. Returns true if 210 * the caller was the last to arrive, and is therefore still attached. 211 */ 212 bool 213 BarrierArriveAndDetachExceptLast(Barrier *barrier) 214 { 215 SpinLockAcquire(&barrier->mutex); 216 if (barrier->participants > 1) 217 { 218 --barrier->participants; 219 SpinLockRelease(&barrier->mutex); 220 221 return false; 222 } 223 Assert(barrier->participants == 1); 224 ++barrier->phase; 225 SpinLockRelease(&barrier->mutex); 226 227 return true; 228 } 229 230 /* 231 * Attach to a barrier. All waiting participants will now wait for this 232 * participant to call BarrierArriveAndWait(), BarrierDetach() or 233 * BarrierArriveAndDetach(). Return the current phase. 234 */ 235 int 236 BarrierAttach(Barrier *barrier) 237 { 238 int phase; 239 240 Assert(!barrier->static_party); 241 242 SpinLockAcquire(&barrier->mutex); 243 ++barrier->participants; 244 phase = barrier->phase; 245 SpinLockRelease(&barrier->mutex); 246 247 return phase; 248 } 249 250 /* 251 * Detach from a barrier. This may release other waiters from 252 * BarrierArriveAndWait() and advance the phase if they were only waiting for 253 * this backend. Return true if this participant was the last to detach. 254 */ 255 bool 256 BarrierDetach(Barrier *barrier) 257 { 258 return BarrierDetachImpl(barrier, false); 259 } 260 261 /* 262 * Return the current phase of a barrier. The caller must be attached. 263 */ 264 int 265 BarrierPhase(Barrier *barrier) 266 { 267 /* 268 * It is OK to read barrier->phase without locking, because it can't 269 * change without us (we are attached to it), and we executed a memory 270 * barrier when we either attached or participated in changing it last 271 * time. 272 */ 273 return barrier->phase; 274 } 275 276 /* 277 * Return an instantaneous snapshot of the number of participants currently 278 * attached to this barrier. For debugging purposes only. 279 */ 280 int 281 BarrierParticipants(Barrier *barrier) 282 { 283 int participants; 284 285 SpinLockAcquire(&barrier->mutex); 286 participants = barrier->participants; 287 SpinLockRelease(&barrier->mutex); 288 289 return participants; 290 } 291 292 /* 293 * Detach from a barrier. If 'arrive' is true then also increment the phase 294 * if there are no other participants. If there are other participants 295 * waiting, then the phase will be advanced and they'll be released if they 296 * were only waiting for the caller. Return true if this participant was the 297 * last to detach. 298 */ 299 static inline bool 300 BarrierDetachImpl(Barrier *barrier, bool arrive) 301 { 302 bool release; 303 bool last; 304 305 Assert(!barrier->static_party); 306 307 SpinLockAcquire(&barrier->mutex); 308 Assert(barrier->participants > 0); 309 --barrier->participants; 310 311 /* 312 * If any other participants are waiting and we were the last participant 313 * waited for, release them. If no other participants are waiting, but 314 * this is a BarrierArriveAndDetach() call, then advance the phase too. 315 */ 316 if ((arrive || barrier->participants > 0) && 317 barrier->arrived == barrier->participants) 318 { 319 release = true; 320 barrier->arrived = 0; 321 ++barrier->phase; 322 } 323 else 324 release = false; 325 326 last = barrier->participants == 0; 327 SpinLockRelease(&barrier->mutex); 328 329 if (release) 330 ConditionVariableBroadcast(&barrier->condition_variable); 331 332 return last; 333 } 334