1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description: Thread progress information. Used by lock free algorithms
23  *              to determine when all involved threads are guaranteed to
24  *              have passed a specific point of execution.
25  *
26  *              Usage instructions below.
27  *
28  * Author: 	Rickard Green
29  */
30 
31 /*
32  * ------ Usage instructions -----------------------------------------------
33  *
34  * This module keeps track of the progress of a set of managed threads. Only
35  * threads that behave well can be allowed to be managed. A managed thread
36  * should update its thread progress frequently. Currently only scheduler
37  * threads, the system-message-dispatcher threads, and the aux-thread are
38  * managed threads. We typically do not want any async threads as managed
39  * threads since they cannot guarantee a frequent update of thread progress,
40  * since they execute user implemented driver code that is assumed to be
41  * time consuming.
42  *
43  * erts_thr_progress_current() returns the global current thread progress
44  * value of managed threads. I.e., the latest progress value that all
45  * managed threads have reached. Thread progress values are opaque.
46  *
47  * erts_thr_progress_has_reached(VAL) returns a value != 0 if current
48  * global thread progress has reached or passed VAL.
49  *
50  * erts_thr_progress_later() returns a thread progress value in the future
51  * which no managed thread have yet reached.
52  *
53  * All threads issue a full memory barrier when reaching a new thread
54  * progress value. They only reach new thread progress values in specific
55  * controlled states when calling erts_thr_progress_update(). Schedulers
56  * call erts_thr_progress_update() in between execution of processes,
57  * when going to sleep and when waking up.
58  *
59  * Sleeping managed threads are considered to have reached next thread
60  * progress value immediately. They are not woken and do therefore not
61  * issue any memory barriers when reaching a new thread progress value.
62  * A sleeping thread do however immediately issue a memory barrier upon
63  * wakeup.
64  *
65  * Both managed and registered unmanaged threads may request wakeup when
66  * the global thread progress reach a certain value using
67  * erts_thr_progress_wakeup().
68  *
69  * Note that thread progress values are opaque, and that you are only
70  * allowed to use thread progress values retrieved from this API!
71  *
72  * -------------------------------------------------------------------------
73  */
74 
75 #ifdef HAVE_CONFIG_H
76 #  include "config.h"
77 #endif
78 
79 #include <stddef.h> /* offsetof() */
80 #include "erl_thr_progress.h"
81 #include "global.h"
82 
83 
84 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 0
85 
86 #ifdef DEBUG
87 #undef ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
88 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 1
89 #endif
90 
91 #define ERTS_THR_PRGR_PRINT_LEADER 0
92 #define ERTS_THR_PRGR_PRINT_VAL 0
93 #define ERTS_THR_PRGR_PRINT_BLOCKERS 0
94 
95 #define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
96 
97 #define ERTS_THR_PRGR_LFLG_BLOCK	((erts_aint32_t) (1U << 31))
98 #define ERTS_THR_PRGR_LFLG_NO_LEADER	((erts_aint32_t) (1U << 30))
99 #define ERTS_THR_PRGR_LFLG_WAITING_UM	((erts_aint32_t) (1U << 29))
100 #define ERTS_THR_PRGR_LFLG_ACTIVE_MASK	(~(ERTS_THR_PRGR_LFLG_NO_LEADER	\
101 					   | ERTS_THR_PRGR_LFLG_BLOCK	\
102 					   | ERTS_THR_PRGR_LFLG_WAITING_UM))
103 
104 #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS)				\
105     ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
106 
107 /*
108  * We use a 64-bit value for thread progress. By this wrapping of
109  * the thread progress will more or less never occur.
110  *
111  * On 32-bit systems we therefore need a double word atomic.
112  */
113 #undef read_acqb
114 #define read_acqb erts_thr_prgr_read_acqb__
115 #undef read_nob
116 #define read_nob erts_thr_prgr_read_nob__
117 
118 static ERTS_INLINE void
set_mb(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)119 set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
120 {
121     erts_atomic64_set_mb(atmc, (erts_aint64_t) val);
122 }
123 
124 static ERTS_INLINE void
set_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)125 set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
126 {
127     erts_atomic64_set_nob(atmc, (erts_aint64_t) val);
128 }
129 
130 static ERTS_INLINE void
init_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)131 init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
132 {
133     erts_atomic64_init_nob(atmc, (erts_aint64_t) val);
134 }
135 
136 /* #define ERTS_THR_PROGRESS_STATE_DEBUG */
137 
138 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
139 
140 #ifdef __GNUC__
141 #warning "Thread progress state debug is on"
142 #endif
143 
144 #define ERTS_THR_PROGRESS_STATE_DEBUG_LEADER	((erts_aint32_t) (1U << 0))
145 #define ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE	((erts_aint32_t) (1U << 1))
146 
147 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)						\
148     erts_atomic32_init_nob(&intrnl->thr[(ID)].data.state_debug,				\
149 			   ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE)
150 
151 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)				\
152 do {											\
153     erts_aint32_t state_debug__;							\
154     state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug);	\
155     if ((ON))										\
156 	state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE;				\
157     else										\
158 	state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE;				\
159     erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__);		\
160 } while (0)
161 
162 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)				\
163 do {											\
164     erts_aint32_t state_debug__;							\
165     state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug);	\
166     if ((ON))										\
167 	state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_LEADER;				\
168     else										\
169 	state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_LEADER;				\
170     erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__);		\
171 } while (0)
172 
173 #else
174 
175 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)
176 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)
177 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)
178 
179 #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
180 
181 #define ERTS_THR_PRGR_BLCKR_INVALID        ((erts_aint32_t) (~0U))
182 #define ERTS_THR_PRGR_BLCKR_UNMANAGED      ((erts_aint32_t) (1U << 31))
183 
184 #define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING  ((erts_aint32_t) (1U << 31))
185 
186 #define ERTS_THR_PRGR_BM_BITS 32
187 #define ERTS_THR_PRGR_BM_SHIFT 5
188 #define ERTS_THR_PRGR_BM_MASK 0x1f
189 
190 #define ERTS_THR_PRGR_WAKEUP_DATA_MASK (ERTS_THR_PRGR_WAKEUP_DATA_SIZE - 1)
191 
192 #define ERTS_THR_PRGR_WAKEUP_IX(V) \
193     ((int) ((V) & ERTS_THR_PRGR_WAKEUP_DATA_MASK))
194 
195 typedef struct {
196     erts_atomic32_t len;
197     int id[1];
198 } ErtsThrPrgrManagedWakeupData;
199 
200 typedef struct {
201     erts_atomic32_t len;
202     int high_sz;
203     int low_sz;
204     erts_atomic32_t *high;
205     erts_atomic32_t *low;
206 } ErtsThrPrgrUnmanagedWakeupData;
207 
208 typedef struct {
209     erts_atomic32_t lflgs;
210     erts_atomic32_t block_count;
211     erts_atomic_t blocker_event;
212     erts_atomic32_t pref_wakeup_used;
213     erts_atomic32_t managed_count;
214     erts_atomic32_t managed_id;
215     erts_atomic32_t unmanaged_id;
216     int chk_next_ix;
217     struct {
218 	int waiting;
219 	erts_atomic32_t current;
220     } umrefc_ix;
221 } ErtsThrPrgrMiscData;
222 
223 typedef struct {
224     ERTS_THR_PRGR_ATOMIC current;
225 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
226     erts_atomic32_t state_debug;
227 #endif
228 } ErtsThrPrgrElement;
229 
230 typedef union {
231     ErtsThrPrgrElement data;
232     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))];
233 } ErtsThrPrgrArray;
234 
235 typedef union {
236     erts_atomic_t refc;
237     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))];
238 } ErtsThrPrgrUnmanagedRefc;
239 
240 typedef struct {
241     union {
242 	ErtsThrPrgrMiscData data;
243 	char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
244 		sizeof(ErtsThrPrgrMiscData))];
245     } misc;
246     ErtsThrPrgrUnmanagedRefc umrefc[2];
247     ErtsThrPrgrArray *thr;
248     struct {
249 	int no;
250 	ErtsThrPrgrCallbacks *callbacks;
251 	ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
252     } managed;
253     struct {
254 	int no;
255 	ErtsThrPrgrCallbacks *callbacks;
256 	ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
257     } unmanaged;
258 } ErtsThrPrgrInternalData;
259 
260 static ErtsThrPrgrInternalData *intrnl;
261 
262 ErtsThrPrgr erts_thr_prgr__;
263 
264 erts_tsd_key_t erts_thr_prgr_data_key__;
265 
266 static void handle_wakeup_requests(ErtsThrPrgrVal current);
267 static int got_sched_wakeups(void);
268 static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
269 
270 static ERTS_INLINE void
wakeup_managed(int id)271 wakeup_managed(int id)
272 {
273     ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
274     ASSERT(0 <= id && id < intrnl->managed.no);
275     cbp->wakeup(cbp->arg);
276 }
277 
278 
279 static ERTS_INLINE void
wakeup_unmanaged(int id)280 wakeup_unmanaged(int id)
281 {
282     ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
283     ASSERT(0 <= id && id < intrnl->unmanaged.no);
284     cbp->wakeup(cbp->arg);
285 }
286 
287 static ERTS_INLINE ErtsThrPrgrData *
perhaps_thr_prgr_data(ErtsSchedulerData * esdp)288 perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
289 {
290     if (esdp)
291 	return &esdp->thr_progress_data;
292     else
293 	return erts_tsd_get(erts_thr_prgr_data_key__);
294 }
295 
296 static ERTS_INLINE ErtsThrPrgrData *
thr_prgr_data(ErtsSchedulerData * esdp)297 thr_prgr_data(ErtsSchedulerData *esdp)
298 {
299     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
300     ASSERT(tpd);
301     return tpd;
302 }
303 
304 static void
init_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)305 init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
306 {
307     tpd->id = -1;
308     tpd->is_managed = 0;
309     tpd->is_blocking = 0;
310     tpd->is_temporary = 1;
311 #ifdef ERTS_ENABLE_LOCK_CHECK
312     tpd->is_delaying = 0;
313 #endif
314     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
315 }
316 
317 static ERTS_INLINE ErtsThrPrgrData *
tmp_thr_prgr_data(ErtsSchedulerData * esdp)318 tmp_thr_prgr_data(ErtsSchedulerData *esdp)
319 {
320     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
321 
322     if (!tpd) {
323         /*
324          * We only allocate the part up to the wakeup_request field which is
325          * the first field only used by registered threads
326          */
327         size_t alloc_size = offsetof(ErtsThrPrgrData, wakeup_request);
328 
329         /* We may land here as a result of unmanaged_delay being called from
330          * the lock counting module, which in turn might be called from within
331          * the allocator, so we use plain malloc to avoid deadlocks. */
332         tpd =
333 #ifdef ERTS_ENABLE_LOCK_COUNT
334             malloc(alloc_size);
335 #else
336             erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, alloc_size);
337 #endif
338 
339         init_tmp_thr_prgr_data(tpd);
340     }
341 
342     return tpd;
343 }
344 
345 static ERTS_INLINE void
return_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)346 return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
347 {
348     if (tpd->is_temporary) {
349         erts_tsd_set(erts_thr_prgr_data_key__, NULL);
350 
351 #ifdef ERTS_ENABLE_LOCK_COUNT
352         free(tpd);
353 #else
354         erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
355 #endif
356     }
357 }
358 
359 static ERTS_INLINE int
block_count_dec(void)360 block_count_dec(void)
361 {
362     erts_aint32_t block_count;
363     block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
364     if (block_count == 0) {
365 	erts_tse_t *event;
366 	event = ((erts_tse_t*)
367 		 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
368 	if (event)
369 	    erts_tse_set(event);
370 	return 1;
371     }
372 
373     return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
374 }
375 
376 static ERTS_INLINE int
block_count_inc(void)377 block_count_inc(void)
378 {
379     erts_aint32_t block_count;
380     block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
381     return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
382 }
383 
384 
385 void
erts_thr_progress_pre_init(void)386 erts_thr_progress_pre_init(void)
387 {
388     intrnl = NULL;
389     erts_tsd_key_create(&erts_thr_prgr_data_key__,
390 			"erts_thr_prgr_data_key");
391     init_nob(&erts_thr_prgr__.current, ERTS_THR_PRGR_VAL_FIRST);
392 }
393 
394 void
erts_thr_progress_init(int no_schedulers,int managed,int unmanaged)395 erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
396 {
397     int i, j, um_low, um_high;
398     char *ptr;
399     size_t cb_sz, intrnl_sz, thr_arr_sz, m_wakeup_size, um_wakeup_size,
400 	tot_size;
401 
402     intrnl_sz = sizeof(ErtsThrPrgrInternalData);
403     intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
404 
405     cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
406     cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
407 
408     thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
409     ASSERT(thr_arr_sz == ERTS_ALC_CACHE_LINE_ALIGN_SIZE(thr_arr_sz));
410 
411     m_wakeup_size = sizeof(ErtsThrPrgrManagedWakeupData);
412     m_wakeup_size += (managed - 1)*sizeof(int);
413     m_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(m_wakeup_size);
414 
415     um_low = (unmanaged - 1)/ERTS_THR_PRGR_BM_BITS + 1;
416     um_high = (um_low - 1)/ERTS_THR_PRGR_BM_BITS + 1;
417 
418     um_wakeup_size = sizeof(ErtsThrPrgrUnmanagedWakeupData);
419     um_wakeup_size += (um_high + um_low)*sizeof(erts_atomic32_t);
420     um_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(um_wakeup_size);
421 
422     tot_size = intrnl_sz;
423     tot_size += cb_sz;
424     tot_size += thr_arr_sz;
425     tot_size += m_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
426     tot_size += um_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
427 
428     ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_THR_PRGR_IDATA,
429 					     tot_size);
430 
431     intrnl = (ErtsThrPrgrInternalData *) ptr;
432     ptr += intrnl_sz;
433 
434     erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
435 			   ERTS_THR_PRGR_LFLG_NO_LEADER);
436     erts_atomic32_init_nob(&intrnl->misc.data.block_count,
437 			   (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
438 			    | (erts_aint32_t) managed));
439     erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
440     erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
441     erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
442     erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
443     erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
444     intrnl->misc.data.chk_next_ix = 0;
445     intrnl->misc.data.umrefc_ix.waiting = -1;
446     erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0);
447 
448     erts_atomic_init_nob(&intrnl->umrefc[0].refc, (erts_aint_t) 0);
449     erts_atomic_init_nob(&intrnl->umrefc[1].refc, (erts_aint_t) 0);
450 
451     intrnl->thr = (ErtsThrPrgrArray *) ptr;
452     ptr += thr_arr_sz;
453     for (i = 0; i < managed; i++)
454 	init_nob(&intrnl->thr[i].data.current, 0);
455 
456     intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
457     intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
458     ptr += cb_sz;
459 
460     intrnl->managed.no = managed;
461     for (i = 0; i < managed; i++) {
462 	intrnl->managed.callbacks[i].arg = NULL;
463 	intrnl->managed.callbacks[i].wakeup = NULL;
464     }
465 
466     intrnl->unmanaged.no = unmanaged;
467     for (i = 0; i < unmanaged; i++) {
468 	intrnl->unmanaged.callbacks[i].arg = NULL;
469 	intrnl->unmanaged.callbacks[i].wakeup = NULL;
470     }
471 
472     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
473 	intrnl->managed.data[i] = (ErtsThrPrgrManagedWakeupData *) ptr;
474 	erts_atomic32_init_nob(&intrnl->managed.data[i]->len, 0);
475 	ptr += m_wakeup_size;
476     }
477 
478     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
479 	erts_atomic32_t *bm;
480 	intrnl->unmanaged.data[i] = (ErtsThrPrgrUnmanagedWakeupData *) ptr;
481 	erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->len, 0);
482 	bm = (erts_atomic32_t *) (ptr + sizeof(ErtsThrPrgrUnmanagedWakeupData));
483 	intrnl->unmanaged.data[i]->high = bm;
484 	intrnl->unmanaged.data[i]->high_sz = um_high;
485 	for (j = 0; j < um_high; j++)
486 	    erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->high[j], 0);
487 	intrnl->unmanaged.data[i]->low
488 	    = &intrnl->unmanaged.data[i]->high[um_high];
489 	intrnl->unmanaged.data[i]->low_sz = um_low;
490 	for (j = 0; j < um_low; j++)
491 	    erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->low[j], 0);
492 	ptr += um_wakeup_size;
493     }
494     ERTS_THR_MEMORY_BARRIER;
495 }
496 
497 static void
init_wakeup_request_array(ErtsThrPrgrVal * w)498 init_wakeup_request_array(ErtsThrPrgrVal *w)
499 {
500     int i;
501     ErtsThrPrgrVal current;
502 
503     current = read_acqb(&erts_thr_prgr__.current);
504     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
505 	w[i] = current - ((ErtsThrPrgrVal) (ERTS_THR_PRGR_WAKEUP_DATA_SIZE + i));
506 	if (w[i] > current)
507 	    w[i]--;
508     }
509 }
510 
erts_thr_progress_data(void)511 ErtsThrPrgrData *erts_thr_progress_data(void) {
512     return erts_tsd_get(erts_thr_prgr_data_key__);
513 }
514 
515 void
erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks * callbacks)516 erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
517 {
518     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
519     int is_blocking = 0;
520 
521     if (tpd) {
522 	if (!tpd->is_temporary)
523 	    erts_exit(ERTS_ABORT_EXIT,
524 		     "%s:%d:%s(): Double register of thread\n",
525 		     __FILE__, __LINE__, __func__);
526 	is_blocking = tpd->is_blocking;
527 	return_tmp_thr_prgr_data(tpd);
528     }
529 
530     /*
531      * We only allocate the part up to the leader field
532      * which is the first field only used by managed threads
533      */
534     tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
535 		     offsetof(ErtsThrPrgrData, leader));
536     tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
537     tpd->is_managed = 0;
538     tpd->is_blocking = is_blocking;
539     tpd->is_temporary = 0;
540 #ifdef ERTS_ENABLE_LOCK_CHECK
541     tpd->is_delaying = 0;
542 #endif
543     ASSERT(tpd->id >= 0);
544     if (tpd->id >= intrnl->unmanaged.no)
545 	erts_exit(ERTS_ABORT_EXIT,
546 		 "%s:%d:%s(): Too many unmanaged registered threads\n",
547 		 __FILE__, __LINE__, __func__);
548 
549     init_wakeup_request_array(&tpd->wakeup_request[0]);
550     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
551 
552     ASSERT(callbacks->wakeup);
553 
554     intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
555 }
556 
557 
558 ErtsThrPrgrData *
erts_thr_progress_register_managed_thread(ErtsSchedulerData * esdp,ErtsThrPrgrCallbacks * callbacks,int pref_wakeup,int deep_sleeper)559 erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
560 					  ErtsThrPrgrCallbacks *callbacks,
561 					  int pref_wakeup,
562                                           int deep_sleeper)
563 {
564     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
565     int is_blocking = 0, managed;
566 
567     if (tpd) {
568 	if (!tpd->is_temporary)
569 	    erts_exit(ERTS_ABORT_EXIT,
570 		     "%s:%d:%s(): Double register of thread\n",
571 		     __FILE__, __LINE__, __func__);
572 	is_blocking = tpd->is_blocking;
573 	return_tmp_thr_prgr_data(tpd);
574     }
575 
576     if (esdp)
577 	tpd = &esdp->thr_progress_data;
578     else
579 	tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
580 
581     if (pref_wakeup
582 	&& !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
583 	tpd->id = 0;
584     else if (esdp)
585 	tpd->id = (int) esdp->no;
586     else
587 	tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
588     ASSERT(tpd->id >= 0);
589     if (tpd->id >= intrnl->managed.no)
590 	erts_exit(ERTS_ABORT_EXIT,
591 		 "%s:%d:%s(): Too many managed registered threads\n",
592 		 __FILE__, __LINE__, __func__);
593 
594     tpd->is_managed = 1;
595     tpd->is_blocking = is_blocking;
596     tpd->is_temporary = 0;
597     tpd->is_deep_sleeper = deep_sleeper;
598 #ifdef ERTS_ENABLE_LOCK_CHECK
599     tpd->is_delaying = 1;
600 #endif
601 
602     init_wakeup_request_array(&tpd->wakeup_request[0]);
603 
604     ERTS_THR_PROGRESS_STATE_DEBUG_INIT(tpd->id);
605 
606     tpd->leader = 0;
607     tpd->active = 1;
608     tpd->confirmed = 0;
609     tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
610     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
611 
612     erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
613 
614     ASSERT(callbacks->wakeup);
615     ASSERT(callbacks->prepare_wait);
616     ASSERT(callbacks->wait);
617     ASSERT(callbacks->finalize_wait);
618 
619     intrnl->managed.callbacks[tpd->id] = *callbacks;
620 
621     callbacks->prepare_wait(callbacks->arg);
622     managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
623     if (managed != intrnl->managed.no) {
624 	/* Wait until all managed threads have registered... */
625 	do {
626 	    callbacks->wait(callbacks->arg);
627 	    callbacks->prepare_wait(callbacks->arg);
628 	    managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
629 	} while (managed != intrnl->managed.no);
630     }
631     else {
632 	int id;
633 	/* All managed threads have registered; lets go... */
634 	for (id = 0; id < managed; id++)
635 	    if (id != tpd->id)
636 		wakeup_managed(id);
637     }
638     callbacks->finalize_wait(callbacks->arg);
639     return tpd;
640 }
641 
642 static ERTS_INLINE int
leader_update(ErtsThrPrgrData * tpd)643 leader_update(ErtsThrPrgrData *tpd)
644 {
645 #ifdef ERTS_ENABLE_LOCK_CHECK
646     erts_lc_check_exact(NULL, 0);
647 #endif
648     if (!tpd->leader) {
649 	/* Probably need to block... */
650 	block_thread(tpd);
651     }
652     else {
653 	ErtsThrPrgrVal current;
654 	int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged;
655 	erts_aint32_t lflgs;
656 	ErtsThrPrgrVal next;
657 	erts_aint_t refc;
658 
659 	my_ix = tpd->id;
660 
661 	if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) {
662 	    /* Took over as leader from another thread */
663 	    tpd->leader_state.current = read_nob(&erts_thr_prgr__.current);
664 	    tpd->leader_state.next = tpd->leader_state.current;
665 	    tpd->leader_state.next++;
666 	    if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING)
667 		tpd->leader_state.next = 0;
668 	    tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix;
669 	    tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting;
670 	    tpd->leader_state.umrefc_ix.current =
671 		(int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current);
672 
673 	    if (tpd->confirmed == tpd->leader_state.current) {
674 		ErtsThrPrgrVal val = tpd->leader_state.current + 1;
675 		if (val == ERTS_THR_PRGR_VAL_WAITING)
676 		    val = 0;
677 		tpd->confirmed = val;
678 		set_mb(&intrnl->thr[my_ix].data.current, val);
679 	    }
680 	}
681 
682 
683 	next = tpd->leader_state.next;
684 
685 	waiting_unmanaged = 0;
686 	umrefc_ix = -1; /* Shut up annoying warning */
687 
688 	chk_next_ix = tpd->leader_state.chk_next_ix;
689 	no_managed = intrnl->managed.no;
690 	ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed);
691 	/* Check manged threads */
692 	if (chk_next_ix < no_managed) {
693 	    for (ix = chk_next_ix; ix < no_managed; ix++) {
694 		ErtsThrPrgrVal tmp;
695 		if (ix == my_ix)
696 		    continue;
697 		tmp = read_nob(&intrnl->thr[ix].data.current);
698 		if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
699 		    tpd->leader_state.chk_next_ix = ix;
700 		    ASSERT(erts_thr_progress_has_passed__(next, tmp));
701 		    goto done;
702 		}
703 	    }
704 	}
705 
706 	/* Check unmanged threads */
707 	waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1;
708 	umrefc_ix = (waiting_unmanaged
709 		     ? tpd->leader_state.umrefc_ix.waiting
710 		     : tpd->leader_state.umrefc_ix.current);
711 	refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
712 	ASSERT(refc >= 0);
713 	if (refc != 0) {
714 	    int new_umrefc_ix;
715 
716 	    if (waiting_unmanaged)
717 		goto done;
718 
719 	    new_umrefc_ix = (umrefc_ix + 1) & 0x1;
720 	    tpd->leader_state.umrefc_ix.waiting = umrefc_ix;
721 	    tpd->leader_state.chk_next_ix = no_managed;
722 	    erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current,
723 				  (erts_aint32_t) new_umrefc_ix);
724 	    tpd->leader_state.umrefc_ix.current = new_umrefc_ix;
725 	    ETHR_MEMBAR(ETHR_StoreLoad);
726 	    refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
727 	    ASSERT(refc >= 0);
728 	    waiting_unmanaged = 1;
729 	    if (refc != 0)
730 		goto done;
731 	}
732 
733 	/* Make progress */
734 	current = next;
735 
736 	next++;
737 	if (next == ERTS_THR_PRGR_VAL_WAITING)
738 	    next = 0;
739 
740 	set_nob(&intrnl->thr[my_ix].data.current, next);
741 	set_mb(&erts_thr_prgr__.current, current);
742 	tpd->confirmed = next;
743 	tpd->leader_state.next = next;
744 	tpd->leader_state.current = current;
745 
746 #if ERTS_THR_PRGR_PRINT_VAL
747 	if (current % 1000 == 0)
748 	    erts_fprintf(stderr, "%b64u\n", current);
749 #endif
750 	handle_wakeup_requests(current);
751 
752 	if (waiting_unmanaged) {
753 	    waiting_unmanaged = 0;
754 	    tpd->leader_state.umrefc_ix.waiting = -1;
755 	    erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs,
756 					~ERTS_THR_PRGR_LFLG_WAITING_UM);
757 	}
758 	tpd->leader_state.chk_next_ix = 0;
759 
760     done:
761 
762 	if (tpd->active) {
763 	    lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
764 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
765 		(void) block_thread(tpd);
766 	}
767 	else {
768 	    erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER;
769 	    tpd->leader = 0;
770 	    tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
771 #if ERTS_THR_PRGR_PRINT_LEADER
772 	    erts_fprintf(stderr, "L <- %d\n", tpd->id);
773 #endif
774 
775 	    ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
776 
777 	    intrnl->misc.data.umrefc_ix.waiting
778 		= tpd->leader_state.umrefc_ix.waiting;
779 	    if (waiting_unmanaged)
780 		set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM;
781 
782 	    lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
783 						set_flags);
784 	    lflgs |= set_flags;
785 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
786 		lflgs = block_thread(tpd);
787 
788 	    if (waiting_unmanaged) {
789 		/* Need to check umrefc again */
790 		ETHR_MEMBAR(ETHR_StoreLoad);
791 		refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
792 		if (refc == 0 && got_sched_wakeups()) {
793                     /* Someone need to make progress */
794                     wakeup_managed(tpd->id);
795 		}
796 	    }
797 	}
798     }
799 
800     return tpd->leader;
801 }
802 
803 static int
update(ErtsThrPrgrData * tpd)804 update(ErtsThrPrgrData *tpd)
805 {
806     int res;
807     ErtsThrPrgrVal val;
808 
809     if (tpd->leader)
810 	res = 1;
811     else {
812 	erts_aint32_t lflgs;
813 	res = 0;
814 	val = read_acqb(&erts_thr_prgr__.current);
815 	if (tpd->confirmed == val) {
816 	    val++;
817 	    if (val == ERTS_THR_PRGR_VAL_WAITING)
818 		val = 0;
819 	    tpd->confirmed = val;
820 	    set_mb(&intrnl->thr[tpd->id].data.current, val);
821 	}
822 
823 	lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
824 	if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
825 	    res = 1; /* Need to block in leader_update() */
826 
827 	if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
828 	    && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
829 	    /* Try to take over leadership... */
830 	    erts_aint32_t olflgs;
831 	    olflgs = erts_atomic32_read_band_acqb(
832 		&intrnl->misc.data.lflgs,
833 		~ERTS_THR_PRGR_LFLG_NO_LEADER);
834 	    if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
835 		tpd->leader = 1;
836 #if ERTS_THR_PRGR_PRINT_LEADER
837 		erts_fprintf(stderr, "L -> %d\n", tpd->id);
838 #endif
839 		ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
840 	    }
841 	}
842 	res |= tpd->leader;
843     }
844     return res;
845 }
846 
847 int
erts_thr_progress_update(ErtsThrPrgrData * tpd)848 erts_thr_progress_update(ErtsThrPrgrData *tpd)
849 {
850     return update(tpd);
851 }
852 
853 
854 int
erts_thr_progress_leader_update(ErtsThrPrgrData * tpd)855 erts_thr_progress_leader_update(ErtsThrPrgrData *tpd)
856 {
857     return leader_update(tpd);
858 }
859 
860 void
erts_thr_progress_prepare_wait(ErtsThrPrgrData * tpd)861 erts_thr_progress_prepare_wait(ErtsThrPrgrData *tpd)
862 {
863     erts_aint32_t lflgs;
864 
865 #ifdef ERTS_ENABLE_LOCK_CHECK
866     erts_lc_check_exact(NULL, 0);
867 #endif
868 
869     block_count_dec();
870 
871     tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING;
872     set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
873 
874     lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
875 
876     if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
877 		  | ERTS_THR_PRGR_LFLG_WAITING_UM
878 		  | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
879 	== ERTS_THR_PRGR_LFLG_NO_LEADER
880 	&& got_sched_wakeups()) {
881 	/* Someone need to make progress */
882         if (tpd->is_deep_sleeper)
883             wakeup_managed(1);
884         else
885             wakeup_managed(tpd->id);
886     }
887 }
888 
889 void
erts_thr_progress_finalize_wait(ErtsThrPrgrData * tpd)890 erts_thr_progress_finalize_wait(ErtsThrPrgrData *tpd)
891 {
892     ErtsThrPrgrVal current, val;
893 
894 #ifdef ERTS_ENABLE_LOCK_CHECK
895     erts_lc_check_exact(NULL, 0);
896 #endif
897 
898     /*
899      * We aren't allowed to continue until our thread
900      * progress is past global current.
901      */
902     val = current = read_acqb(&erts_thr_prgr__.current);
903     while (1) {
904 	val++;
905 	if (val == ERTS_THR_PRGR_VAL_WAITING)
906 	    val = 0;
907 	tpd->confirmed = val;
908 	set_mb(&intrnl->thr[tpd->id].data.current, val);
909 	val = read_acqb(&erts_thr_prgr__.current);
910 	if (current == val)
911 	    break;
912 	current = val;
913     }
914     if (block_count_inc())
915 	block_thread(tpd);
916     if (update(tpd))
917 	leader_update(tpd);
918 }
919 
920 void
erts_thr_progress_active(ErtsThrPrgrData * tpd,int on)921 erts_thr_progress_active(ErtsThrPrgrData *tpd, int on)
922 {
923 
924 #ifdef ERTS_ENABLE_LOCK_CHECK
925     erts_lc_check_exact(NULL, 0);
926 #endif
927 
928     ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
929 
930     if (on) {
931 	ASSERT(!tpd->active);
932 	tpd->active = 1;
933 	erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
934     }
935     else {
936 	ASSERT(tpd->active);
937 	tpd->active = 0;
938 	erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
939 	if (update(tpd))
940 	    leader_update(tpd);
941     }
942 
943 #ifdef DEBUG
944     {
945 	erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
946 	n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
947 	ASSERT(tpd->active <= n && n <= intrnl->managed.no);
948     }
949 #endif
950 
951 }
952 
953 static ERTS_INLINE void
unmanaged_continue(ErtsThrPrgrDelayHandle handle)954 unmanaged_continue(ErtsThrPrgrDelayHandle handle)
955 {
956     int umrefc_ix = (int) handle;
957     erts_aint_t refc;
958 
959     ASSERT(umrefc_ix == 0 || umrefc_ix == 1);
960     refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc);
961     ASSERT(refc >= 0);
962     if (refc == 0) {
963 	erts_aint_t lflgs;
964 	ERTS_THR_READ_MEMORY_BARRIER;
965 	lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
966 	if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
967 		      | ERTS_THR_PRGR_LFLG_WAITING_UM
968 		      | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
969 	    == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM)
970 	    && got_sched_wakeups()) {
971 	    /* Others waiting for us... */
972 	    wakeup_managed(1);
973 	}
974     }
975 }
976 
977 void
erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)978 erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)
979 {
980 #ifdef ERTS_ENABLE_LOCK_CHECK
981     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
982     ERTS_LC_ASSERT(tpd && tpd->is_delaying);
983     tpd->is_delaying--;
984     ASSERT(tpd->is_delaying >= 0);
985     if (!tpd->is_delaying)
986 	return_tmp_thr_prgr_data(tpd);
987 #endif
988     ASSERT(!erts_thr_progress_is_managed_thread());
989 
990     unmanaged_continue(handle);
991 }
992 
993 ErtsThrPrgrDelayHandle
erts_thr_progress_unmanaged_delay__(void)994 erts_thr_progress_unmanaged_delay__(void)
995 {
996     int umrefc_ix;
997     ASSERT(!erts_thr_progress_is_managed_thread());
998     umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
999     while (1) {
1000 	int tmp_ix;
1001 	erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc);
1002 	tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1003 	if (tmp_ix == umrefc_ix)
1004 	    break;
1005 	unmanaged_continue(umrefc_ix);
1006 	umrefc_ix = tmp_ix;
1007     }
1008 #ifdef ERTS_ENABLE_LOCK_CHECK
1009     {
1010 	ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL);
1011 	tpd->is_delaying++;
1012     }
1013 #endif
1014     return (ErtsThrPrgrDelayHandle) umrefc_ix;
1015 }
1016 
1017 static ERTS_INLINE int
has_reached_wakeup(ErtsThrPrgrVal wakeup)1018 has_reached_wakeup(ErtsThrPrgrVal wakeup)
1019 {
1020     /*
1021      * Exactly the same as erts_thr_progress_has_reached(), but
1022      * also verify valid wakeup requests in debug mode.
1023      */
1024     ErtsThrPrgrVal current;
1025 
1026     current = read_acqb(&erts_thr_prgr__.current);
1027 
1028 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1029     {
1030 	ErtsThrPrgrVal limit;
1031 	/*
1032 	 * erts_thr_progress_later() returns values which are
1033 	 * equal to 'current + 2', or 'current + 3'. That is, users
1034 	 * should never get a hold of values larger than that.
1035 	 *
1036 	 * That is, valid values are values less than 'current + 4'.
1037 	 *
1038 	 * Values larger than this won't work with the wakeup
1039 	 * algorithm.
1040 	 */
1041 
1042 	limit = current + 4;
1043 	if (limit == ERTS_THR_PRGR_VAL_WAITING)
1044 	    limit = 0;
1045 	else if (limit < current) /* Wrapped */
1046 	    limit += 1;
1047 
1048 	if (!erts_thr_progress_has_passed__(limit, wakeup))
1049 	    erts_exit(ERTS_ABORT_EXIT,
1050 		     "Invalid wakeup request value found:"
1051 		     " current=%b64u, wakeup=%b64u, limit=%b64u",
1052 		     current, wakeup, limit);
1053     }
1054 #endif
1055 
1056     if (current == wakeup)
1057 	return 1;
1058     return erts_thr_progress_has_passed__(current, wakeup);
1059 }
1060 
1061 static void
request_wakeup_managed(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1062 request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1063 {
1064     ErtsThrPrgrManagedWakeupData *mwd;
1065     int ix, wix;
1066 
1067     /*
1068      * Only managed threads that aren't in waiting state
1069      * and aren't deep sleepers are allowed to call this
1070      * function.
1071      */
1072 
1073     ASSERT(tpd->is_managed);
1074     ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING);
1075     ASSERT(!tpd->is_deep_sleeper);
1076 
1077     if (has_reached_wakeup(value)) {
1078 	wakeup_managed(tpd->id);
1079 	return;
1080     }
1081 
1082     wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1083     if (tpd->wakeup_request[wix] == value)
1084 	return; /* Already got a request registered */
1085 
1086     ASSERT(erts_thr_progress_has_passed__(value,
1087 					  tpd->wakeup_request[wix]));
1088 
1089 
1090     if (tpd->confirmed == value) {
1091 	/*
1092 	 * We have already confirmed this value. We need to request
1093 	 * wakeup for a value later than our latest confirmed value in
1094 	 * order to prevent progress from reaching the requested value
1095 	 * while we are writing the request.
1096 	 *
1097 	 * It is ok to move the wakeup request forward since the only
1098 	 * guarantee we make (and can make) is that the thread will be
1099 	 * woken some time *after* the requested value has been reached.
1100 	 */
1101 	value++;
1102 	if (value == ERTS_THR_PRGR_VAL_WAITING)
1103 	    value = 0;
1104 
1105 	wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1106 	if (tpd->wakeup_request[wix] == value)
1107 	    return; /* Already got a request registered */
1108 
1109 	ASSERT(erts_thr_progress_has_passed__(value,
1110 					      tpd->wakeup_request[wix]));
1111     }
1112 
1113     tpd->wakeup_request[wix] = value;
1114 
1115     mwd = intrnl->managed.data[wix];
1116 
1117     ix = erts_atomic32_inc_read_nob(&mwd->len) - 1;
1118 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1119     if (ix >= intrnl->managed.no)
1120 	erts_exit(ERTS_ABORT_EXIT, "Internal error: Too many wakeup requests\n");
1121 #endif
1122     mwd->id[ix] = tpd->id;
1123 
1124     ASSERT(!erts_thr_progress_has_reached(value));
1125 
1126     /*
1127      * This thread is guarranteed to issue a full memory barrier:
1128      * - after the request has been written, but
1129      * - before the global thread progress reach the (possibly
1130      *   increased) requested wakeup value.
1131      */
1132 }
1133 
1134 static void
request_wakeup_unmanaged(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1135 request_wakeup_unmanaged(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1136 {
1137     int wix, ix, id, bit;
1138     ErtsThrPrgrUnmanagedWakeupData *umwd;
1139 
1140     ASSERT(!tpd->is_managed);
1141 
1142     /*
1143      * Thread progress *can* reach and pass our requested value while
1144      * we are writing the request.
1145      */
1146 
1147     if (has_reached_wakeup(value)) {
1148 	wakeup_unmanaged(tpd->id);
1149 	return;
1150     }
1151 
1152     wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1153 
1154     if (tpd->wakeup_request[wix] == value)
1155 	return; /* Already got a request registered */
1156 
1157     ASSERT(erts_thr_progress_has_passed__(value,
1158 					  tpd->wakeup_request[wix]));
1159 
1160     umwd = intrnl->unmanaged.data[wix];
1161 
1162     id = tpd->id;
1163 
1164     bit = id & ERTS_THR_PRGR_BM_MASK;
1165     ix = id >> ERTS_THR_PRGR_BM_SHIFT;
1166     ASSERT(0 <= ix && ix < umwd->low_sz);
1167     erts_atomic32_read_bor_nob(&umwd->low[ix], 1 << bit);
1168 
1169     bit = ix & ERTS_THR_PRGR_BM_MASK;
1170     ix >>= ERTS_THR_PRGR_BM_SHIFT;
1171     ASSERT(0 <= ix && ix < umwd->high_sz);
1172     erts_atomic32_read_bor_nob(&umwd->high[ix], 1 << bit);
1173 
1174     erts_atomic32_inc_mb(&umwd->len);
1175 
1176     if (erts_thr_progress_has_reached(value))
1177 	wakeup_unmanaged(tpd->id);
1178     else
1179 	tpd->wakeup_request[wix] = value;
1180 }
1181 
1182 void
erts_thr_progress_wakeup(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1183 erts_thr_progress_wakeup(ErtsThrPrgrData *tpd,
1184 			 ErtsThrPrgrVal value)
1185 {
1186 
1187     ASSERT(!tpd->is_temporary);
1188     if (tpd->is_managed)
1189 	request_wakeup_managed(tpd, value);
1190     else
1191 	request_wakeup_unmanaged(tpd, value);
1192 }
1193 
1194 static void
wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData * umwd)1195 wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData *umwd)
1196 {
1197     int hix;
1198     for (hix = 0; hix < umwd->high_sz; hix++) {
1199 	erts_aint32_t hmask = erts_atomic32_read_nob(&umwd->high[hix]);
1200 	if (hmask) {
1201 	    int hbase = hix << ERTS_THR_PRGR_BM_SHIFT;
1202 	    int hbit;
1203 	    for (hbit = 0; hbit < ERTS_THR_PRGR_BM_BITS; hbit++) {
1204 		if (hmask & (1U << hbit)) {
1205 		    erts_aint_t lmask;
1206 		    int lix = hbase + hbit;
1207 		    ASSERT(0 <= lix && lix < umwd->low_sz);
1208 		    lmask = erts_atomic32_read_nob(&umwd->low[lix]);
1209 		    if (lmask) {
1210 			int lbase = lix << ERTS_THR_PRGR_BM_SHIFT;
1211 			int lbit;
1212 			for (lbit = 0; lbit < ERTS_THR_PRGR_BM_BITS; lbit++) {
1213 			    if (lmask & (1U << lbit)) {
1214 				int id = lbase + lbit;
1215 				wakeup_unmanaged(id);
1216 			    }
1217 			}
1218 			erts_atomic32_set_nob(&umwd->low[lix], 0);
1219 		    }
1220 		}
1221 	    }
1222 	    erts_atomic32_set_nob(&umwd->high[hix], 0);
1223 	}
1224     }
1225 }
1226 
1227 
1228 static void
handle_wakeup_requests(ErtsThrPrgrVal current)1229 handle_wakeup_requests(ErtsThrPrgrVal current)
1230 {
1231     ErtsThrPrgrManagedWakeupData *mwd;
1232     ErtsThrPrgrUnmanagedWakeupData *umwd;
1233     int wix, len, i;
1234 
1235     wix = ERTS_THR_PRGR_WAKEUP_IX(current);
1236 
1237     mwd = intrnl->managed.data[wix];
1238     len = erts_atomic32_read_nob(&mwd->len);
1239     ASSERT(len >= 0);
1240     if (len) {
1241 	for (i = 0; i < len; i++)
1242 	    wakeup_managed(mwd->id[i]);
1243 	erts_atomic32_set_nob(&mwd->len, 0);
1244     }
1245 
1246     umwd = intrnl->unmanaged.data[wix];
1247     len = erts_atomic32_read_nob(&umwd->len);
1248     ASSERT(len >= 0);
1249     if (len) {
1250 	wakeup_unmanaged_threads(umwd);
1251 	erts_atomic32_set_nob(&umwd->len, 0);
1252     }
1253 
1254 }
1255 
1256 static int
got_sched_wakeups(void)1257 got_sched_wakeups(void)
1258 {
1259     int wix;
1260 
1261     ERTS_THR_MEMORY_BARRIER;
1262 
1263     for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1264  	ErtsThrPrgrManagedWakeupData **mwd = intrnl->managed.data;
1265 	if (erts_atomic32_read_nob(&mwd[wix]->len))
1266 	    return 1;
1267     }
1268     for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1269  	ErtsThrPrgrUnmanagedWakeupData **umwd = intrnl->unmanaged.data;
1270 	if (erts_atomic32_read_nob(&umwd[wix]->len))
1271 	    return 1;
1272     }
1273     return 0;
1274 }
1275 
1276 static erts_aint32_t
block_thread(ErtsThrPrgrData * tpd)1277 block_thread(ErtsThrPrgrData *tpd)
1278 {
1279     erts_aint32_t lflgs;
1280     ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
1281 
1282     do {
1283 	block_count_dec();
1284 
1285 	while (1) {
1286 	    cbp->prepare_wait(cbp->arg);
1287 	    lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
1288 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1289 		cbp->wait(cbp->arg);
1290 	    else
1291 		break;
1292 	}
1293 
1294     } while (block_count_inc());
1295 
1296     cbp->finalize_wait(cbp->arg);
1297 
1298     return lflgs;
1299 }
1300 
1301 static erts_aint32_t
thr_progress_block(ErtsThrPrgrData * tpd,int wait)1302 thr_progress_block(ErtsThrPrgrData *tpd, int wait)
1303 {
1304     erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
1305     erts_aint32_t lflgs, bc;
1306 
1307     if (tpd->is_blocking++)
1308 	return (erts_aint32_t) 0;
1309 
1310     while (1) {
1311 	lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
1312 					   ERTS_THR_PRGR_LFLG_BLOCK);
1313 	if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1314 	    block_thread(tpd);
1315 	else
1316 	    break;
1317     }
1318 
1319 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1320     erts_fprintf(stderr, "block(%d)\n", tpd->id);
1321 #endif
1322 
1323     ASSERT(ERTS_AINT_NULL
1324 	   == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1325 
1326     if (wait) {
1327 	event = erts_tse_fetch();
1328 	erts_tse_reset(event);
1329 	erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
1330 			    (erts_aint_t) event);
1331     }
1332     if (tpd->is_managed)
1333 	erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
1334     bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
1335 				    ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1336     bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
1337     if (wait) {
1338 	while (bc != 0) {
1339 	    erts_tse_wait(event);
1340 	    erts_tse_reset(event);
1341 	    bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1342 	}
1343     }
1344 
1345     /* tse event returned in erts_thr_progress_unblock() */
1346     return bc;
1347 
1348 }
1349 
1350 void
erts_thr_progress_block(void)1351 erts_thr_progress_block(void)
1352 {
1353     thr_progress_block(tmp_thr_prgr_data(NULL), 1);
1354 }
1355 
1356 int
erts_thr_progress_fatal_error_block(ErtsThrPrgrData * tmp_tpd_bufp)1357 erts_thr_progress_fatal_error_block(ErtsThrPrgrData *tmp_tpd_bufp)
1358 {
1359     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1360 
1361     if (!tpd) {
1362 	/*
1363 	 * We stack allocate since failure to allocate memory may
1364 	 * have caused the problem in the first place. This is ok
1365 	 * since we never complete an unblock after a fatal error
1366 	 * block.
1367 	 */
1368 	tpd = tmp_tpd_bufp;
1369 	init_tmp_thr_prgr_data(tpd);
1370     }
1371 
1372     /* Returns number of threads that have not yes been blocked */
1373     return thr_progress_block(tpd, 0);
1374 }
1375 
1376 void
erts_thr_progress_fatal_error_wait(SWord timeout)1377 erts_thr_progress_fatal_error_wait(SWord timeout) {
1378     erts_aint32_t bc;
1379     SWord time_left = timeout;
1380     ErtsMonotonicTime timeout_time;
1381     ErtsSchedulerData *esdp = erts_get_scheduler_data();
1382 
1383     /*
1384      * Counting poll intervals may give us a too long timeout
1385      * if cpu is busy. We use timeout time to try to prevent
1386      * this. In case we havn't got time correction this may
1387      * however fail too...
1388      */
1389     timeout_time = erts_get_monotonic_time(esdp);
1390     timeout_time += ERTS_MSEC_TO_MONOTONIC((ErtsMonotonicTime) timeout);
1391 
1392     while (1) {
1393 	if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
1394 	    time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
1395 	bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1396 	if (bc == 0)
1397 	    break; /* Succefully blocked all managed threads */
1398 	if (time_left <= 0)
1399 	    break; /* Timeout */
1400 	if (timeout_time <= erts_get_monotonic_time(esdp))
1401 	    break; /* Timeout */
1402     }
1403 }
1404 
1405 void
erts_thr_progress_unblock(void)1406 erts_thr_progress_unblock(void)
1407 {
1408     erts_tse_t *event;
1409     int id, break_id, sz, wakeup;
1410     ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
1411 
1412     ASSERT(tpd->is_blocking);
1413     if (--tpd->is_blocking)
1414 	return;
1415 
1416     sz = intrnl->managed.no;
1417 
1418     wakeup = 1;
1419     if (!tpd->is_managed)
1420 	id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
1421     else {
1422 	break_id = tpd->id;
1423 	id = break_id + 1;
1424 	if (id >= sz)
1425 	    id = 0;
1426 	if (id == break_id)
1427 	    wakeup = 0;
1428 	erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
1429     }
1430 
1431     event = ((erts_tse_t *)
1432 	     erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1433     ASSERT(event);
1434     erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
1435 
1436     erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
1437 				ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1438 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1439     erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
1440 #endif
1441     erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
1442 			       ~ERTS_THR_PRGR_LFLG_BLOCK);
1443 
1444     if (wakeup) {
1445 	do {
1446 	    ErtsThrPrgrVal tmp;
1447 	    tmp = read_nob(&intrnl->thr[id].data.current);
1448 	    if (tmp != ERTS_THR_PRGR_VAL_WAITING)
1449 		wakeup_managed(id);
1450 	    if (++id >= sz)
1451 		id = 0;
1452 	} while (id != break_id);
1453     }
1454 
1455     return_tmp_thr_prgr_data(tpd);
1456     erts_tse_return(event);
1457 }
1458 
1459 int
erts_thr_progress_is_blocking(void)1460 erts_thr_progress_is_blocking(void)
1461 {
1462     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1463     return tpd && tpd->is_blocking;
1464 }
1465 
erts_thr_progress_dbg_print_state(void)1466 void erts_thr_progress_dbg_print_state(void)
1467 {
1468     int id;
1469     int sz = intrnl->managed.no;
1470 
1471     erts_fprintf(stderr, "--- thread progress ---\n");
1472     erts_fprintf(stderr,"current=%b64u\n", erts_thr_progress_current());
1473     for (id = 0; id < sz; id++) {
1474 	ErtsThrPrgrVal current = read_nob(&intrnl->thr[id].data.current);
1475 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1476 	erts_aint32_t state_debug;
1477 	char *active, *leader;
1478 
1479 	state_debug = erts_atomic32_read_nob(&intrnl->thr[id].data.state_debug);
1480 	active = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE
1481 		  ? "true"
1482 		  : "false");
1483 	leader = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_LEADER
1484 		  ? "true"
1485 		  : "false");
1486 #endif
1487 	if (current == ERTS_THR_PRGR_VAL_WAITING)
1488 	    erts_fprintf(stderr,
1489 			 "  id=%d, current=WAITING"
1490 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1491 			 ", active=%s, leader=%s"
1492 #endif
1493 			 "\n", id
1494 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1495 			 , active, leader
1496 #endif
1497 		);
1498 	else
1499 	    erts_fprintf(stderr,
1500 			 "  id=%d, current=%b64u"
1501 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1502 			 ", active=%s, leader=%s"
1503 #endif
1504 			 "\n", id, current
1505 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1506 			 , active, leader
1507 #endif
1508 		);
1509     }
1510     erts_fprintf(stderr, "-----------------------\n");
1511 
1512 
1513 }
1514 
1515