1 /*-------------------------------------------------------------------------
2  *
3  * barrier.c
4  *	  Barriers for synchronizing cooperating processes.
5  *
6  * Portions Copyright (c) 1996-2018, PostgreSQL Global Development Group
7  * Portions Copyright (c) 1994, Regents of the University of California
8  *
9  * From Wikipedia[1]: "In parallel computing, a barrier is a type of
10  * synchronization method.  A barrier for a group of threads or processes in
11  * the source code means any thread/process must stop at this point and cannot
12  * proceed until all other threads/processes reach this barrier."
13  *
14  * This implementation of barriers allows for static sets of participants
15  * known up front, or dynamic sets of participants which processes can join or
16  * leave at any time.  In the dynamic case, a phase number can be used to
17  * track progress through a parallel algorithm, and may be necessary to
18  * synchronize with the current phase of a multi-phase algorithm when a new
19  * participant joins.  In the static case, the phase number is used
20  * internally, but it isn't strictly necessary for client code to access it
21  * because the phase can only advance when the declared number of participants
22  * reaches the barrier, so client code should be in no doubt about the current
23  * phase of computation at all times.
24  *
25  * Consider a parallel algorithm that involves separate phases of computation
26  * A, B and C where the output of each phase is needed before the next phase
27  * can begin.
28  *
29  * In the case of a static barrier initialized with 4 participants, each
30  * participant works on phase A, then calls BarrierArriveAndWait to wait until
31  * all 4 participants have reached that point.  When BarrierArriveAndWait
32  * returns control, each participant can work on B, and so on.  Because the
33  * barrier knows how many participants to expect, the phases of computation
34  * don't need labels or numbers, since each process's program counter implies
35  * the current phase.  Even if some of the processes are slow to start up and
36  * begin running phase A, the other participants are expecting them and will
37  * patiently wait at the barrier.  The code could be written as follows:
38  *
39  *     perform_a();
40  *     BarrierArriveAndWait(&barrier, ...);
41  *     perform_b();
42  *     BarrierArriveAndWait(&barrier, ...);
43  *     perform_c();
44  *     BarrierArriveAndWait(&barrier, ...);
45  *
46  * If the number of participants is not known up front, then a dynamic barrier
47  * is needed and the number should be set to zero at initialization.  New
48  * complications arise because the number necessarily changes over time as
49  * participants attach and detach, and therefore phases B, C or even the end
50  * of processing may be reached before any given participant has started
51  * running and attached.  Therefore the client code must perform an initial
52  * test of the phase number after attaching, because it needs to find out
53  * which phase of the algorithm has been reached by any participants that are
54  * already attached in order to synchronize with that work.  Once the program
55  * counter or some other representation of current progress is synchronized
56  * with the barrier's phase, normal control flow can be used just as in the
57  * static case.  Our example could be written using a switch statement with
58  * cases that fall-through, as follows:
59  *
60  *     phase = BarrierAttach(&barrier);
61  *     switch (phase)
62  *     {
63  *     case PHASE_A:
64  *         perform_a();
65  *         BarrierArriveAndWait(&barrier, ...);
66  *     case PHASE_B:
67  *         perform_b();
68  *         BarrierArriveAndWait(&barrier, ...);
69  *     case PHASE_C:
70  *         perform_c();
71  *         BarrierArriveAndWait(&barrier, ...);
72  *     }
73  *     BarrierDetach(&barrier);
74  *
75  * Static barriers behave similarly to POSIX's pthread_barrier_t.  Dynamic
76  * barriers behave similarly to Java's java.util.concurrent.Phaser.
77  *
78  * [1] https://en.wikipedia.org/wiki/Barrier_(computer_science)
79  *
80  * IDENTIFICATION
81  *	  src/backend/storage/ipc/barrier.c
82  *
83  *-------------------------------------------------------------------------
84  */
85 
86 #include "postgres.h"
87 #include "storage/barrier.h"
88 
89 static inline bool BarrierDetachImpl(Barrier *barrier, bool arrive);
90 
91 /*
92  * Initialize this barrier.  To use a static party size, provide the number of
93  * participants to wait for at each phase indicating that that number of
94  * backends is implicitly attached.  To use a dynamic party size, specify zero
95  * here and then use BarrierAttach() and
96  * BarrierDetach()/BarrierArriveAndDetach() to register and deregister
97  * participants explicitly.
98  */
99 void
BarrierInit(Barrier * barrier,int participants)100 BarrierInit(Barrier *barrier, int participants)
101 {
102 	SpinLockInit(&barrier->mutex);
103 	barrier->participants = participants;
104 	barrier->arrived = 0;
105 	barrier->phase = 0;
106 	barrier->elected = 0;
107 	barrier->static_party = participants > 0;
108 	ConditionVariableInit(&barrier->condition_variable);
109 }
110 
111 /*
112  * Arrive at this barrier, wait for all other attached participants to arrive
113  * too and then return.  Increments the current phase.  The caller must be
114  * attached.
115  *
116  * While waiting, pg_stat_activity shows a wait_event_class and wait_event
117  * controlled by the wait_event_info passed in, which should be a value from
118  * from one of the WaitEventXXX enums defined in pgstat.h.
119  *
120  * Return true in one arbitrarily chosen participant.  Return false in all
121  * others.  The return code can be used to elect one participant to execute a
122  * phase of work that must be done serially while other participants wait.
123  */
124 bool
BarrierArriveAndWait(Barrier * barrier,uint32 wait_event_info)125 BarrierArriveAndWait(Barrier *barrier, uint32 wait_event_info)
126 {
127 	bool		release = false;
128 	bool		elected;
129 	int			start_phase;
130 	int			next_phase;
131 
132 	SpinLockAcquire(&barrier->mutex);
133 	start_phase = barrier->phase;
134 	next_phase = start_phase + 1;
135 	++barrier->arrived;
136 	if (barrier->arrived == barrier->participants)
137 	{
138 		release = true;
139 		barrier->arrived = 0;
140 		barrier->phase = next_phase;
141 		barrier->elected = next_phase;
142 	}
143 	SpinLockRelease(&barrier->mutex);
144 
145 	/*
146 	 * If we were the last expected participant to arrive, we can release our
147 	 * peers and return true to indicate that this backend has been elected to
148 	 * perform any serial work.
149 	 */
150 	if (release)
151 	{
152 		ConditionVariableBroadcast(&barrier->condition_variable);
153 
154 		return true;
155 	}
156 
157 	/*
158 	 * Otherwise we have to wait for the last participant to arrive and
159 	 * advance the phase.
160 	 */
161 	elected = false;
162 	ConditionVariablePrepareToSleep(&barrier->condition_variable);
163 	for (;;)
164 	{
165 		/*
166 		 * We know that phase must either be start_phase, indicating that we
167 		 * need to keep waiting, or next_phase, indicating that the last
168 		 * participant that we were waiting for has either arrived or detached
169 		 * so that the next phase has begun.  The phase cannot advance any
170 		 * further than that without this backend's participation, because
171 		 * this backend is attached.
172 		 */
173 		SpinLockAcquire(&barrier->mutex);
174 		Assert(barrier->phase == start_phase || barrier->phase == next_phase);
175 		release = barrier->phase == next_phase;
176 		if (release && barrier->elected != next_phase)
177 		{
178 			/*
179 			 * Usually the backend that arrives last and releases the other
180 			 * backends is elected to return true (see above), so that it can
181 			 * begin processing serial work while it has a CPU timeslice.
182 			 * However, if the barrier advanced because someone detached, then
183 			 * one of the backends that is awoken will need to be elected.
184 			 */
185 			barrier->elected = barrier->phase;
186 			elected = true;
187 		}
188 		SpinLockRelease(&barrier->mutex);
189 		if (release)
190 			break;
191 		ConditionVariableSleep(&barrier->condition_variable, wait_event_info);
192 	}
193 	ConditionVariableCancelSleep();
194 
195 	return elected;
196 }
197 
198 /*
199  * Arrive at this barrier, but detach rather than waiting.  Returns true if
200  * the caller was the last to detach.
201  */
202 bool
BarrierArriveAndDetach(Barrier * barrier)203 BarrierArriveAndDetach(Barrier *barrier)
204 {
205 	return BarrierDetachImpl(barrier, true);
206 }
207 
208 /*
209  * Attach to a barrier.  All waiting participants will now wait for this
210  * participant to call BarrierArriveAndWait(), BarrierDetach() or
211  * BarrierArriveAndDetach().  Return the current phase.
212  */
213 int
BarrierAttach(Barrier * barrier)214 BarrierAttach(Barrier *barrier)
215 {
216 	int			phase;
217 
218 	Assert(!barrier->static_party);
219 
220 	SpinLockAcquire(&barrier->mutex);
221 	++barrier->participants;
222 	phase = barrier->phase;
223 	SpinLockRelease(&barrier->mutex);
224 
225 	return phase;
226 }
227 
228 /*
229  * Detach from a barrier.  This may release other waiters from BarrierWait and
230  * advance the phase if they were only waiting for this backend.  Return true
231  * if this participant was the last to detach.
232  */
233 bool
BarrierDetach(Barrier * barrier)234 BarrierDetach(Barrier *barrier)
235 {
236 	return BarrierDetachImpl(barrier, false);
237 }
238 
239 /*
240  * Return the current phase of a barrier.  The caller must be attached.
241  */
242 int
BarrierPhase(Barrier * barrier)243 BarrierPhase(Barrier *barrier)
244 {
245 	/*
246 	 * It is OK to read barrier->phase without locking, because it can't
247 	 * change without us (we are attached to it), and we executed a memory
248 	 * barrier when we either attached or participated in changing it last
249 	 * time.
250 	 */
251 	return barrier->phase;
252 }
253 
254 /*
255  * Return an instantaneous snapshot of the number of participants currently
256  * attached to this barrier.  For debugging purposes only.
257  */
258 int
BarrierParticipants(Barrier * barrier)259 BarrierParticipants(Barrier *barrier)
260 {
261 	int			participants;
262 
263 	SpinLockAcquire(&barrier->mutex);
264 	participants = barrier->participants;
265 	SpinLockRelease(&barrier->mutex);
266 
267 	return participants;
268 }
269 
270 /*
271  * Detach from a barrier.  If 'arrive' is true then also increment the phase
272  * if there are no other participants.  If there are other participants
273  * waiting, then the phase will be advanced and they'll be released if they
274  * were only waiting for the caller.  Return true if this participant was the
275  * last to detach.
276  */
277 static inline bool
BarrierDetachImpl(Barrier * barrier,bool arrive)278 BarrierDetachImpl(Barrier *barrier, bool arrive)
279 {
280 	bool		release;
281 	bool		last;
282 
283 	Assert(!barrier->static_party);
284 
285 	SpinLockAcquire(&barrier->mutex);
286 	Assert(barrier->participants > 0);
287 	--barrier->participants;
288 
289 	/*
290 	 * If any other participants are waiting and we were the last participant
291 	 * waited for, release them.  If no other participants are waiting, but
292 	 * this is a BarrierArriveAndDetach() call, then advance the phase too.
293 	 */
294 	if ((arrive || barrier->participants > 0) &&
295 		barrier->arrived == barrier->participants)
296 	{
297 		release = true;
298 		barrier->arrived = 0;
299 		++barrier->phase;
300 	}
301 	else
302 		release = false;
303 
304 	last = barrier->participants == 0;
305 	SpinLockRelease(&barrier->mutex);
306 
307 	if (release)
308 		ConditionVariableBroadcast(&barrier->condition_variable);
309 
310 	return last;
311 }
312