1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description: Thread progress information. Used by lock free algorithms
23  *              to determine when all involved threads are guaranteed to
24  *              have passed a specific point of execution.
25  *
26  *              Usage instructions below.
27  *
28  * Author: 	Rickard Green
29  */
30 
31 /*
32  * ------ Usage instructions -----------------------------------------------
33  *
34  * This module keeps track of the progress of a set of managed threads. Only
35  * threads that behave well can be allowed to be managed. A managed thread
36  * should update its thread progress frequently. Currently only scheduler
37  * threads, the system-message-dispatcher threads, and the aux-thread are
38  * managed threads. We typically do not want any async threads as managed
39  * threads since they cannot guarantee a frequent update of thread progress,
40  * since they execute user implemented driver code that is assumed to be
41  * time consuming.
42  *
43  * erts_thr_progress_current() returns the global current thread progress
44  * value of managed threads. I.e., the latest progress value that all
45  * managed threads have reached. Thread progress values are opaque.
46  *
47  * erts_thr_progress_has_reached(VAL) returns a value != 0 if current
48  * global thread progress has reached or passed VAL.
49  *
50  * erts_thr_progress_later() returns a thread progress value in the future
51  * which no managed thread have yet reached.
52  *
53  * All threads issue a full memory barrier when reaching a new thread
54  * progress value. They only reach new thread progress values in specific
55  * controlled states when calling erts_thr_progress_update(). Schedulers
56  * call erts_thr_progress_update() in between execution of processes,
57  * when going to sleep and when waking up.
58  *
59  * Sleeping managed threads are considered to have reached next thread
60  * progress value immediately. They are not woken and do therefore not
61  * issue any memory barriers when reaching a new thread progress value.
62  * A sleeping thread do however immediately issue a memory barrier upon
63  * wakeup.
64  *
65  * Both managed and registered unmanaged threads may request wakeup when
66  * the global thread progress reach a certain value using
67  * erts_thr_progress_wakeup().
68  *
69  * Note that thread progress values are opaque, and that you are only
70  * allowed to use thread progress values retrieved from this API!
71  *
72  * -------------------------------------------------------------------------
73  */
74 
75 #ifdef HAVE_CONFIG_H
76 #  include "config.h"
77 #endif
78 
79 #include <stddef.h> /* offsetof() */
80 #include "erl_thr_progress.h"
81 #include "global.h"
82 
83 
84 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 0
85 
86 #ifdef DEBUG
87 #undef ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
88 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 1
89 #endif
90 
91 #define ERTS_THR_PRGR_PRINT_LEADER 0
92 #define ERTS_THR_PRGR_PRINT_VAL 0
93 #define ERTS_THR_PRGR_PRINT_BLOCKERS 0
94 
95 #define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
96 
97 #define ERTS_THR_PRGR_LFLG_BLOCK	((erts_aint32_t) (1U << 31))
98 #define ERTS_THR_PRGR_LFLG_NO_LEADER	((erts_aint32_t) (1U << 30))
99 #define ERTS_THR_PRGR_LFLG_WAITING_UM	((erts_aint32_t) (1U << 29))
100 #define ERTS_THR_PRGR_LFLG_ACTIVE_MASK	(~(ERTS_THR_PRGR_LFLG_NO_LEADER	\
101 					   | ERTS_THR_PRGR_LFLG_BLOCK	\
102 					   | ERTS_THR_PRGR_LFLG_WAITING_UM))
103 
104 #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS)				\
105     ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
106 
107 /*
108  * We use a 64-bit value for thread progress. By this wrapping of
109  * the thread progress will more or less never occur.
110  *
111  * On 32-bit systems we therefore need a double word atomic.
112  */
113 #undef read_acqb
114 #define read_acqb erts_thr_prgr_read_acqb__
115 #undef read_nob
116 #define read_nob erts_thr_prgr_read_nob__
117 
118 static ERTS_INLINE void
set_mb(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)119 set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
120 {
121     erts_atomic64_set_mb(atmc, (erts_aint64_t) val);
122 }
123 
124 static ERTS_INLINE void
set_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)125 set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
126 {
127     erts_atomic64_set_nob(atmc, (erts_aint64_t) val);
128 }
129 
130 static ERTS_INLINE void
init_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)131 init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
132 {
133     erts_atomic64_init_nob(atmc, (erts_aint64_t) val);
134 }
135 
136 /* #define ERTS_THR_PROGRESS_STATE_DEBUG */
137 
138 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
139 
140 #ifdef __GNUC__
141 #warning "Thread progress state debug is on"
142 #endif
143 
144 #define ERTS_THR_PROGRESS_STATE_DEBUG_LEADER	((erts_aint32_t) (1U << 0))
145 #define ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE	((erts_aint32_t) (1U << 1))
146 
147 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)						\
148     erts_atomic32_init_nob(&intrnl->thr[(ID)].data.state_debug,				\
149 			   ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE)
150 
151 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)				\
152 do {											\
153     erts_aint32_t state_debug__;							\
154     state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug);	\
155     if ((ON))										\
156 	state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE;				\
157     else										\
158 	state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE;				\
159     erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__);		\
160 } while (0)
161 
162 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)				\
163 do {											\
164     erts_aint32_t state_debug__;							\
165     state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug);	\
166     if ((ON))										\
167 	state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_LEADER;				\
168     else										\
169 	state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_LEADER;				\
170     erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__);		\
171 } while (0)
172 
173 #else
174 
175 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)
176 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)
177 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)
178 
179 #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
180 
181 #define ERTS_THR_PRGR_BLCKR_INVALID        ((erts_aint32_t) (~0U))
182 #define ERTS_THR_PRGR_BLCKR_UNMANAGED      ((erts_aint32_t) (1U << 31))
183 
184 #define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING  ((erts_aint32_t) (1U << 31))
185 
186 #define ERTS_THR_PRGR_BM_BITS 32
187 #define ERTS_THR_PRGR_BM_SHIFT 5
188 #define ERTS_THR_PRGR_BM_MASK 0x1f
189 
190 #define ERTS_THR_PRGR_WAKEUP_DATA_MASK (ERTS_THR_PRGR_WAKEUP_DATA_SIZE - 1)
191 
192 #define ERTS_THR_PRGR_WAKEUP_IX(V) \
193     ((int) ((V) & ERTS_THR_PRGR_WAKEUP_DATA_MASK))
194 
195 typedef struct {
196     erts_atomic32_t len;
197     int id[1];
198 } ErtsThrPrgrManagedWakeupData;
199 
200 typedef struct {
201     erts_atomic32_t len;
202     int high_sz;
203     int low_sz;
204     erts_atomic32_t *high;
205     erts_atomic32_t *low;
206 } ErtsThrPrgrUnmanagedWakeupData;
207 
208 typedef struct {
209     erts_atomic32_t lflgs;
210     erts_atomic32_t block_count;
211     erts_atomic_t blocker_event;
212     erts_atomic32_t pref_wakeup_used;
213     erts_atomic32_t managed_count;
214     erts_atomic32_t managed_id;
215     erts_atomic32_t unmanaged_id;
216     int chk_next_ix;
217     struct {
218 	int waiting;
219 	erts_atomic32_t current;
220     } umrefc_ix;
221 } ErtsThrPrgrMiscData;
222 
223 typedef struct {
224     ERTS_THR_PRGR_ATOMIC current;
225 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
226     erts_atomic32_t state_debug;
227 #endif
228 } ErtsThrPrgrElement;
229 
230 typedef union {
231     ErtsThrPrgrElement data;
232     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))];
233 } ErtsThrPrgrArray;
234 
235 typedef union {
236     erts_atomic_t refc;
237     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))];
238 } ErtsThrPrgrUnmanagedRefc;
239 
240 typedef struct {
241     union {
242 	ErtsThrPrgrMiscData data;
243 	char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
244 		sizeof(ErtsThrPrgrMiscData))];
245     } misc;
246     ErtsThrPrgrUnmanagedRefc umrefc[2];
247     ErtsThrPrgrArray *thr;
248     struct {
249 	int no;
250 	ErtsThrPrgrCallbacks *callbacks;
251 	ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
252     } managed;
253     struct {
254 	int no;
255 	ErtsThrPrgrCallbacks *callbacks;
256 	ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
257     } unmanaged;
258 } ErtsThrPrgrInternalData;
259 
260 static ErtsThrPrgrInternalData *intrnl;
261 
262 ErtsThrPrgr erts_thr_prgr__;
263 
264 erts_tsd_key_t erts_thr_prgr_data_key__;
265 
266 static void handle_wakeup_requests(ErtsThrPrgrVal current);
267 static int got_sched_wakeups(void);
268 static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
269 
270 static ERTS_INLINE void
wakeup_managed(int id)271 wakeup_managed(int id)
272 {
273     ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
274     ASSERT(0 <= id && id < intrnl->managed.no);
275     cbp->wakeup(cbp->arg);
276 }
277 
278 
279 static ERTS_INLINE void
wakeup_unmanaged(int id)280 wakeup_unmanaged(int id)
281 {
282     ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
283     ASSERT(0 <= id && id < intrnl->unmanaged.no);
284     cbp->wakeup(cbp->arg);
285 }
286 
287 static ERTS_INLINE ErtsThrPrgrData *
perhaps_thr_prgr_data(ErtsSchedulerData * esdp)288 perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
289 {
290     if (esdp)
291 	return &esdp->thr_progress_data;
292     else
293 	return erts_tsd_get(erts_thr_prgr_data_key__);
294 }
295 
296 static ERTS_INLINE ErtsThrPrgrData *
thr_prgr_data(ErtsSchedulerData * esdp)297 thr_prgr_data(ErtsSchedulerData *esdp)
298 {
299     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
300     ASSERT(tpd);
301     return tpd;
302 }
303 
304 static void
init_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)305 init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
306 {
307     tpd->id = -1;
308     tpd->is_managed = 0;
309     tpd->is_blocking = 0;
310     tpd->is_temporary = 1;
311 #ifdef ERTS_ENABLE_LOCK_CHECK
312     tpd->is_delaying = 0;
313 #endif
314     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
315 }
316 
317 static ERTS_INLINE ErtsThrPrgrData *
tmp_thr_prgr_data(ErtsSchedulerData * esdp)318 tmp_thr_prgr_data(ErtsSchedulerData *esdp)
319 {
320     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
321 
322     if (!tpd) {
323         /*
324          * We only allocate the part up to the wakeup_request field which is
325          * the first field only used by registered threads
326          */
327         size_t alloc_size = offsetof(ErtsThrPrgrData, wakeup_request);
328 
329         /* We may land here as a result of unmanaged_delay being called from
330          * the lock counting module, which in turn might be called from within
331          * the allocator, so we use plain malloc to avoid deadlocks. */
332         tpd =
333 #ifdef ERTS_ENABLE_LOCK_COUNT
334             malloc(alloc_size);
335 #else
336             erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, alloc_size);
337 #endif
338 
339         init_tmp_thr_prgr_data(tpd);
340     }
341 
342     return tpd;
343 }
344 
345 static ERTS_INLINE void
return_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)346 return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
347 {
348     if (tpd->is_temporary) {
349         erts_tsd_set(erts_thr_prgr_data_key__, NULL);
350 
351 #ifdef ERTS_ENABLE_LOCK_COUNT
352         free(tpd);
353 #else
354         erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
355 #endif
356     }
357 }
358 
359 static ERTS_INLINE int
block_count_dec(void)360 block_count_dec(void)
361 {
362     erts_aint32_t block_count;
363     block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
364     if (block_count == 0) {
365 	erts_tse_t *event;
366 	event = ((erts_tse_t*)
367 		 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
368 	if (event)
369 	    erts_tse_set(event);
370 	return 1;
371     }
372 
373     return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
374 }
375 
376 static ERTS_INLINE int
block_count_inc(void)377 block_count_inc(void)
378 {
379     erts_aint32_t block_count;
380     block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
381     return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
382 }
383 
384 
385 void
erts_thr_progress_pre_init(void)386 erts_thr_progress_pre_init(void)
387 {
388     intrnl = NULL;
389     erts_tsd_key_create(&erts_thr_prgr_data_key__,
390 			"erts_thr_prgr_data_key");
391     init_nob(&erts_thr_prgr__.current, ERTS_THR_PRGR_VAL_FIRST);
392 }
393 
394 void
erts_thr_progress_init(int no_schedulers,int managed,int unmanaged)395 erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
396 {
397     int i, j, um_low, um_high;
398     char *ptr;
399     size_t cb_sz, intrnl_sz, thr_arr_sz, m_wakeup_size, um_wakeup_size,
400 	tot_size;
401 
402     intrnl_sz = sizeof(ErtsThrPrgrInternalData);
403     intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
404 
405     cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
406     cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
407 
408     thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
409     ASSERT(thr_arr_sz == ERTS_ALC_CACHE_LINE_ALIGN_SIZE(thr_arr_sz));
410 
411     m_wakeup_size = sizeof(ErtsThrPrgrManagedWakeupData);
412     m_wakeup_size += (managed - 1)*sizeof(int);
413     m_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(m_wakeup_size);
414 
415     um_low = (unmanaged - 1)/ERTS_THR_PRGR_BM_BITS + 1;
416     um_high = (um_low - 1)/ERTS_THR_PRGR_BM_BITS + 1;
417 
418     um_wakeup_size = sizeof(ErtsThrPrgrUnmanagedWakeupData);
419     um_wakeup_size += (um_high + um_low)*sizeof(erts_atomic32_t);
420     um_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(um_wakeup_size);
421 
422     tot_size = intrnl_sz;
423     tot_size += cb_sz;
424     tot_size += thr_arr_sz;
425     tot_size += m_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
426     tot_size += um_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
427 
428     ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_THR_PRGR_IDATA,
429 					     tot_size);
430 
431     intrnl = (ErtsThrPrgrInternalData *) ptr;
432     ptr += intrnl_sz;
433 
434     erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
435 			   ERTS_THR_PRGR_LFLG_NO_LEADER);
436     erts_atomic32_init_nob(&intrnl->misc.data.block_count,
437 			   (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
438 			    | (erts_aint32_t) managed));
439     erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
440     erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
441     erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
442     erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
443     erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
444     intrnl->misc.data.chk_next_ix = 0;
445     intrnl->misc.data.umrefc_ix.waiting = -1;
446     erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0);
447 
448     erts_atomic_init_nob(&intrnl->umrefc[0].refc, (erts_aint_t) 0);
449     erts_atomic_init_nob(&intrnl->umrefc[1].refc, (erts_aint_t) 0);
450 
451     intrnl->thr = (ErtsThrPrgrArray *) ptr;
452     ptr += thr_arr_sz;
453     for (i = 0; i < managed; i++)
454 	init_nob(&intrnl->thr[i].data.current, 0);
455 
456     intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
457     intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
458     ptr += cb_sz;
459 
460     intrnl->managed.no = managed;
461     for (i = 0; i < managed; i++) {
462 	intrnl->managed.callbacks[i].arg = NULL;
463 	intrnl->managed.callbacks[i].wakeup = NULL;
464     }
465 
466     intrnl->unmanaged.no = unmanaged;
467     for (i = 0; i < unmanaged; i++) {
468 	intrnl->unmanaged.callbacks[i].arg = NULL;
469 	intrnl->unmanaged.callbacks[i].wakeup = NULL;
470     }
471 
472     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
473 	intrnl->managed.data[i] = (ErtsThrPrgrManagedWakeupData *) ptr;
474 	erts_atomic32_init_nob(&intrnl->managed.data[i]->len, 0);
475 	ptr += m_wakeup_size;
476     }
477 
478     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
479 	erts_atomic32_t *bm;
480 	intrnl->unmanaged.data[i] = (ErtsThrPrgrUnmanagedWakeupData *) ptr;
481 	erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->len, 0);
482 	bm = (erts_atomic32_t *) (ptr + sizeof(ErtsThrPrgrUnmanagedWakeupData));
483 	intrnl->unmanaged.data[i]->high = bm;
484 	intrnl->unmanaged.data[i]->high_sz = um_high;
485 	for (j = 0; j < um_high; j++)
486 	    erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->high[j], 0);
487 	intrnl->unmanaged.data[i]->low
488 	    = &intrnl->unmanaged.data[i]->high[um_high];
489 	intrnl->unmanaged.data[i]->low_sz = um_low;
490 	for (j = 0; j < um_low; j++)
491 	    erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->low[j], 0);
492 	ptr += um_wakeup_size;
493     }
494     ERTS_THR_MEMORY_BARRIER;
495 }
496 
497 static void
init_wakeup_request_array(ErtsThrPrgrVal * w)498 init_wakeup_request_array(ErtsThrPrgrVal *w)
499 {
500     int i;
501     ErtsThrPrgrVal current;
502 
503     current = read_acqb(&erts_thr_prgr__.current);
504     for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
505 	w[i] = current - ((ErtsThrPrgrVal) (ERTS_THR_PRGR_WAKEUP_DATA_SIZE + i));
506 	if (w[i] > current)
507 	    w[i]--;
508     }
509 }
510 
erts_thr_progress_data(void)511 ErtsThrPrgrData *erts_thr_progress_data(void) {
512     return erts_tsd_get(erts_thr_prgr_data_key__);
513 }
514 
515 void
erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks * callbacks)516 erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
517 {
518     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
519     int is_blocking = 0;
520 
521     if (tpd) {
522 	if (!tpd->is_temporary)
523 	    erts_exit(ERTS_ABORT_EXIT,
524 		     "%s:%d:%s(): Double register of thread\n",
525 		     __FILE__, __LINE__, __func__);
526 	is_blocking = tpd->is_blocking;
527 	return_tmp_thr_prgr_data(tpd);
528     }
529 
530     /*
531      * We only allocate the part up to the leader field
532      * which is the first field only used by managed threads
533      */
534     tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
535 		     offsetof(ErtsThrPrgrData, leader));
536     tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
537     tpd->is_managed = 0;
538     tpd->is_blocking = is_blocking;
539     tpd->is_temporary = 0;
540 #ifdef ERTS_ENABLE_LOCK_CHECK
541     tpd->is_delaying = 0;
542 #endif
543     ASSERT(tpd->id >= 0);
544     if (tpd->id >= intrnl->unmanaged.no)
545 	erts_exit(ERTS_ABORT_EXIT,
546 		 "%s:%d:%s(): Too many unmanaged registered threads\n",
547 		 __FILE__, __LINE__, __func__);
548 
549     init_wakeup_request_array(&tpd->wakeup_request[0]);
550     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
551 
552     ASSERT(callbacks->wakeup);
553 
554     intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
555 }
556 
557 
558 ErtsThrPrgrData *
erts_thr_progress_register_managed_thread(ErtsSchedulerData * esdp,ErtsThrPrgrCallbacks * callbacks,int pref_wakeup,int deep_sleeper)559 erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
560 					  ErtsThrPrgrCallbacks *callbacks,
561 					  int pref_wakeup,
562                                           int deep_sleeper)
563 {
564     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
565     int is_blocking = 0, managed;
566 
567     if (tpd) {
568 	if (!tpd->is_temporary)
569 	    erts_exit(ERTS_ABORT_EXIT,
570 		     "%s:%d:%s(): Double register of thread\n",
571 		     __FILE__, __LINE__, __func__);
572 	is_blocking = tpd->is_blocking;
573 	return_tmp_thr_prgr_data(tpd);
574     }
575 
576     if (esdp)
577 	tpd = &esdp->thr_progress_data;
578     else
579 	tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
580 
581     if (pref_wakeup
582 	&& !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
583 	tpd->id = 0;
584     else if (esdp)
585 	tpd->id = (int) esdp->no;
586     else
587 	tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
588     ASSERT(tpd->id >= 0);
589     if (tpd->id >= intrnl->managed.no)
590 	erts_exit(ERTS_ABORT_EXIT,
591 		 "%s:%d:%s(): Too many managed registered threads\n",
592 		 __FILE__, __LINE__, __func__);
593 
594     tpd->is_managed = 1;
595     tpd->is_blocking = is_blocking;
596     tpd->is_temporary = 0;
597     tpd->is_deep_sleeper = deep_sleeper;
598 #ifdef ERTS_ENABLE_LOCK_CHECK
599     tpd->is_delaying = 1;
600 #endif
601 
602     init_wakeup_request_array(&tpd->wakeup_request[0]);
603 
604     ERTS_THR_PROGRESS_STATE_DEBUG_INIT(tpd->id);
605 
606     tpd->leader = 0;
607     tpd->active = 1;
608     tpd->confirmed = 0;
609     tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
610     erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
611 
612     erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
613 
614     ASSERT(callbacks->wakeup);
615     ASSERT(callbacks->prepare_wait);
616     ASSERT(callbacks->wait);
617     ASSERT(callbacks->finalize_wait);
618 
619     intrnl->managed.callbacks[tpd->id] = *callbacks;
620 
621     callbacks->prepare_wait(callbacks->arg);
622     managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
623     if (managed != intrnl->managed.no) {
624 	/* Wait until all managed threads have registered... */
625 	do {
626 	    callbacks->wait(callbacks->arg);
627 	    callbacks->prepare_wait(callbacks->arg);
628 	    managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
629 	} while (managed != intrnl->managed.no);
630     }
631     else {
632 	int id;
633 	/* All managed threads have registered; lets go... */
634 	for (id = 0; id < managed; id++)
635 	    if (id != tpd->id)
636 		wakeup_managed(id);
637     }
638     callbacks->finalize_wait(callbacks->arg);
639     return tpd;
640 }
641 
642 static ERTS_INLINE int
leader_update(ErtsThrPrgrData * tpd)643 leader_update(ErtsThrPrgrData *tpd)
644 {
645 #ifdef ERTS_ENABLE_LOCK_CHECK
646     erts_lc_check_exact(NULL, 0);
647 #endif
648     if (!tpd->leader) {
649 	/* Probably need to block... */
650 	block_thread(tpd);
651     }
652     else {
653 	ErtsThrPrgrVal current;
654 	int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged;
655 	erts_aint32_t lflgs;
656 	ErtsThrPrgrVal next;
657 	erts_aint_t refc;
658 
659 	my_ix = tpd->id;
660 
661 	if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) {
662 	    /* Took over as leader from another thread */
663 	    tpd->leader_state.current = read_nob(&erts_thr_prgr__.current);
664 	    tpd->leader_state.next = tpd->leader_state.current;
665 	    tpd->leader_state.next++;
666 	    if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING)
667 		tpd->leader_state.next = 0;
668 	    tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix;
669 	    tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting;
670 	    tpd->leader_state.umrefc_ix.current =
671 		(int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current);
672 
673 	    if (tpd->confirmed == tpd->leader_state.current) {
674 		ErtsThrPrgrVal val = tpd->leader_state.current + 1;
675 		if (val == ERTS_THR_PRGR_VAL_WAITING)
676 		    val = 0;
677 		tpd->confirmed = val;
678 		set_mb(&intrnl->thr[my_ix].data.current, val);
679 	    }
680 	}
681 
682 
683 	next = tpd->leader_state.next;
684 
685 	waiting_unmanaged = 0;
686 	umrefc_ix = -1; /* Shut up annoying warning */
687 
688 	chk_next_ix = tpd->leader_state.chk_next_ix;
689 	no_managed = intrnl->managed.no;
690 	ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed);
691 	/* Check manged threads */
692 	if (chk_next_ix < no_managed) {
693 	    for (ix = chk_next_ix; ix < no_managed; ix++) {
694 		ErtsThrPrgrVal tmp;
695 		if (ix == my_ix)
696 		    continue;
697 		tmp = read_nob(&intrnl->thr[ix].data.current);
698 		if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
699 		    tpd->leader_state.chk_next_ix = ix;
700 		    ASSERT(erts_thr_progress_has_passed__(next, tmp));
701 		    goto done;
702 		}
703 	    }
704 	}
705 
706 	/* Check unmanged threads */
707 	waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1;
708 	umrefc_ix = (waiting_unmanaged
709 		     ? tpd->leader_state.umrefc_ix.waiting
710 		     : tpd->leader_state.umrefc_ix.current);
711 	refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
712 	ASSERT(refc >= 0);
713 	if (refc != 0) {
714 	    int new_umrefc_ix;
715 
716 	    if (waiting_unmanaged)
717 		goto done;
718 
719 	    new_umrefc_ix = (umrefc_ix + 1) & 0x1;
720 	    tpd->leader_state.umrefc_ix.waiting = umrefc_ix;
721 	    tpd->leader_state.chk_next_ix = no_managed;
722 	    erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current,
723 				  (erts_aint32_t) new_umrefc_ix);
724 	    tpd->leader_state.umrefc_ix.current = new_umrefc_ix;
725 	    ETHR_MEMBAR(ETHR_StoreLoad);
726 	    refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
727 	    ASSERT(refc >= 0);
728 	    waiting_unmanaged = 1;
729 	    if (refc != 0)
730 		goto done;
731 	}
732 
733 	/* Make progress */
734 	current = next;
735 
736 	next++;
737 	if (next == ERTS_THR_PRGR_VAL_WAITING)
738 	    next = 0;
739 
740 	set_nob(&intrnl->thr[my_ix].data.current, next);
741 	set_mb(&erts_thr_prgr__.current, current);
742 	tpd->confirmed = next;
743 	tpd->leader_state.next = next;
744 	tpd->leader_state.current = current;
745 
746 #if ERTS_THR_PRGR_PRINT_VAL
747 	if (current % 1000 == 0)
748 	    erts_fprintf(stderr, "%b64u\n", current);
749 #endif
750 	handle_wakeup_requests(current);
751 
752 	if (waiting_unmanaged) {
753 	    waiting_unmanaged = 0;
754 	    tpd->leader_state.umrefc_ix.waiting = -1;
755 	    erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs,
756 					~ERTS_THR_PRGR_LFLG_WAITING_UM);
757 	}
758 	tpd->leader_state.chk_next_ix = 0;
759 
760     done:
761 
762 	if (tpd->active) {
763 	    lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
764 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
765 		(void) block_thread(tpd);
766 	}
767 	else {
768 	    int force_wakeup_check = 0;
769 	    erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER;
770 	    tpd->leader = 0;
771 	    tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
772 #if ERTS_THR_PRGR_PRINT_LEADER
773 	    erts_fprintf(stderr, "L <- %d\n", tpd->id);
774 #endif
775 
776 	    ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
777 
778 	    intrnl->misc.data.umrefc_ix.waiting
779 		= tpd->leader_state.umrefc_ix.waiting;
780 	    if (waiting_unmanaged)
781 		set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM;
782 
783 	    lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
784 						set_flags);
785 	    lflgs |= set_flags;
786 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
787 		lflgs = block_thread(tpd);
788 
789 	    if (waiting_unmanaged) {
790 		/* Need to check umrefc again */
791 		ETHR_MEMBAR(ETHR_StoreLoad);
792 		refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
793 		if (refc == 0) {
794 		    /* Need to force wakeup check */
795 		    force_wakeup_check = 1;
796 		}
797 	    }
798 
799 	    if ((force_wakeup_check
800 		 || ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
801 			       | ERTS_THR_PRGR_LFLG_WAITING_UM
802 			       | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
803 		     == ERTS_THR_PRGR_LFLG_NO_LEADER))
804 		&& got_sched_wakeups()) {
805 		/* Someone need to make progress */
806 		wakeup_managed(tpd->id);
807 	    }
808 	}
809     }
810 
811     return tpd->leader;
812 }
813 
814 static int
update(ErtsThrPrgrData * tpd)815 update(ErtsThrPrgrData *tpd)
816 {
817     int res;
818     ErtsThrPrgrVal val;
819 
820     if (tpd->leader)
821 	res = 1;
822     else {
823 	erts_aint32_t lflgs;
824 	res = 0;
825 	val = read_acqb(&erts_thr_prgr__.current);
826 	if (tpd->confirmed == val) {
827 	    val++;
828 	    if (val == ERTS_THR_PRGR_VAL_WAITING)
829 		val = 0;
830 	    tpd->confirmed = val;
831 	    set_mb(&intrnl->thr[tpd->id].data.current, val);
832 	}
833 
834 	lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
835 	if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
836 	    res = 1; /* Need to block in leader_update() */
837 
838 	if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
839 	    && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
840 	    /* Try to take over leadership... */
841 	    erts_aint32_t olflgs;
842 	    olflgs = erts_atomic32_read_band_acqb(
843 		&intrnl->misc.data.lflgs,
844 		~ERTS_THR_PRGR_LFLG_NO_LEADER);
845 	    if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
846 		tpd->leader = 1;
847 #if ERTS_THR_PRGR_PRINT_LEADER
848 		erts_fprintf(stderr, "L -> %d\n", tpd->id);
849 #endif
850 		ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
851 	    }
852 	}
853 	res |= tpd->leader;
854     }
855     return res;
856 }
857 
858 int
erts_thr_progress_update(ErtsThrPrgrData * tpd)859 erts_thr_progress_update(ErtsThrPrgrData *tpd)
860 {
861     return update(tpd);
862 }
863 
864 
865 int
erts_thr_progress_leader_update(ErtsThrPrgrData * tpd)866 erts_thr_progress_leader_update(ErtsThrPrgrData *tpd)
867 {
868     return leader_update(tpd);
869 }
870 
871 void
erts_thr_progress_prepare_wait(ErtsThrPrgrData * tpd)872 erts_thr_progress_prepare_wait(ErtsThrPrgrData *tpd)
873 {
874     erts_aint32_t lflgs;
875 
876 #ifdef ERTS_ENABLE_LOCK_CHECK
877     erts_lc_check_exact(NULL, 0);
878 #endif
879 
880     block_count_dec();
881 
882     tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING;
883     set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
884 
885     lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
886 
887     if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
888 		  | ERTS_THR_PRGR_LFLG_WAITING_UM
889 		  | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
890 	== ERTS_THR_PRGR_LFLG_NO_LEADER
891 	&& got_sched_wakeups()) {
892 	/* Someone need to make progress */
893         if (tpd->is_deep_sleeper)
894             wakeup_managed(1);
895         else
896             wakeup_managed(tpd->id);
897     }
898 }
899 
900 void
erts_thr_progress_finalize_wait(ErtsThrPrgrData * tpd)901 erts_thr_progress_finalize_wait(ErtsThrPrgrData *tpd)
902 {
903     ErtsThrPrgrVal current, val;
904 
905 #ifdef ERTS_ENABLE_LOCK_CHECK
906     erts_lc_check_exact(NULL, 0);
907 #endif
908 
909     /*
910      * We aren't allowed to continue until our thread
911      * progress is past global current.
912      */
913     val = current = read_acqb(&erts_thr_prgr__.current);
914     while (1) {
915 	val++;
916 	if (val == ERTS_THR_PRGR_VAL_WAITING)
917 	    val = 0;
918 	tpd->confirmed = val;
919 	set_mb(&intrnl->thr[tpd->id].data.current, val);
920 	val = read_acqb(&erts_thr_prgr__.current);
921 	if (current == val)
922 	    break;
923 	current = val;
924     }
925     if (block_count_inc())
926 	block_thread(tpd);
927     if (update(tpd))
928 	leader_update(tpd);
929 }
930 
931 void
erts_thr_progress_active(ErtsThrPrgrData * tpd,int on)932 erts_thr_progress_active(ErtsThrPrgrData *tpd, int on)
933 {
934 
935 #ifdef ERTS_ENABLE_LOCK_CHECK
936     erts_lc_check_exact(NULL, 0);
937 #endif
938 
939     ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
940 
941     if (on) {
942 	ASSERT(!tpd->active);
943 	tpd->active = 1;
944 	erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
945     }
946     else {
947 	ASSERT(tpd->active);
948 	tpd->active = 0;
949 	erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
950 	if (update(tpd))
951 	    leader_update(tpd);
952     }
953 
954 #ifdef DEBUG
955     {
956 	erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
957 	n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
958 	ASSERT(tpd->active <= n && n <= intrnl->managed.no);
959     }
960 #endif
961 
962 }
963 
964 static ERTS_INLINE void
unmanaged_continue(ErtsThrPrgrDelayHandle handle)965 unmanaged_continue(ErtsThrPrgrDelayHandle handle)
966 {
967     int umrefc_ix = (int) handle;
968     erts_aint_t refc;
969 
970     ASSERT(umrefc_ix == 0 || umrefc_ix == 1);
971     refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc);
972     ASSERT(refc >= 0);
973     if (refc == 0) {
974 	erts_aint_t lflgs;
975 	ERTS_THR_READ_MEMORY_BARRIER;
976 	lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
977 	if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
978 		      | ERTS_THR_PRGR_LFLG_WAITING_UM
979 		      | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
980 	    == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM)
981 	    && got_sched_wakeups()) {
982 	    /* Others waiting for us... */
983 	    wakeup_managed(1);
984 	}
985     }
986 }
987 
988 void
erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)989 erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)
990 {
991 #ifdef ERTS_ENABLE_LOCK_CHECK
992     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
993     ERTS_LC_ASSERT(tpd && tpd->is_delaying);
994     tpd->is_delaying--;
995     ASSERT(tpd->is_delaying >= 0);
996     if (!tpd->is_delaying)
997 	return_tmp_thr_prgr_data(tpd);
998 #endif
999     ASSERT(!erts_thr_progress_is_managed_thread());
1000 
1001     unmanaged_continue(handle);
1002 }
1003 
1004 ErtsThrPrgrDelayHandle
erts_thr_progress_unmanaged_delay__(void)1005 erts_thr_progress_unmanaged_delay__(void)
1006 {
1007     int umrefc_ix;
1008     ASSERT(!erts_thr_progress_is_managed_thread());
1009     umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1010     while (1) {
1011 	int tmp_ix;
1012 	erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc);
1013 	tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1014 	if (tmp_ix == umrefc_ix)
1015 	    break;
1016 	unmanaged_continue(umrefc_ix);
1017 	umrefc_ix = tmp_ix;
1018     }
1019 #ifdef ERTS_ENABLE_LOCK_CHECK
1020     {
1021 	ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL);
1022 	tpd->is_delaying++;
1023     }
1024 #endif
1025     return (ErtsThrPrgrDelayHandle) umrefc_ix;
1026 }
1027 
1028 static ERTS_INLINE int
has_reached_wakeup(ErtsThrPrgrVal wakeup)1029 has_reached_wakeup(ErtsThrPrgrVal wakeup)
1030 {
1031     /*
1032      * Exactly the same as erts_thr_progress_has_reached(), but
1033      * also verify valid wakeup requests in debug mode.
1034      */
1035     ErtsThrPrgrVal current;
1036 
1037     current = read_acqb(&erts_thr_prgr__.current);
1038 
1039 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1040     {
1041 	ErtsThrPrgrVal limit;
1042 	/*
1043 	 * erts_thr_progress_later() returns values which are
1044 	 * equal to 'current + 2', or 'current + 3'. That is, users
1045 	 * should never get a hold of values larger than that.
1046 	 *
1047 	 * That is, valid values are values less than 'current + 4'.
1048 	 *
1049 	 * Values larger than this won't work with the wakeup
1050 	 * algorithm.
1051 	 */
1052 
1053 	limit = current + 4;
1054 	if (limit == ERTS_THR_PRGR_VAL_WAITING)
1055 	    limit = 0;
1056 	else if (limit < current) /* Wrapped */
1057 	    limit += 1;
1058 
1059 	if (!erts_thr_progress_has_passed__(limit, wakeup))
1060 	    erts_exit(ERTS_ABORT_EXIT,
1061 		     "Invalid wakeup request value found:"
1062 		     " current=%b64u, wakeup=%b64u, limit=%b64u",
1063 		     current, wakeup, limit);
1064     }
1065 #endif
1066 
1067     if (current == wakeup)
1068 	return 1;
1069     return erts_thr_progress_has_passed__(current, wakeup);
1070 }
1071 
1072 static void
request_wakeup_managed(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1073 request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1074 {
1075     ErtsThrPrgrManagedWakeupData *mwd;
1076     int ix, wix;
1077 
1078     /*
1079      * Only managed threads that aren't in waiting state
1080      * and aren't deep sleepers are allowed to call this
1081      * function.
1082      */
1083 
1084     ASSERT(tpd->is_managed);
1085     ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING);
1086     ASSERT(!tpd->is_deep_sleeper);
1087 
1088     if (has_reached_wakeup(value)) {
1089 	wakeup_managed(tpd->id);
1090 	return;
1091     }
1092 
1093     wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1094     if (tpd->wakeup_request[wix] == value)
1095 	return; /* Already got a request registered */
1096 
1097     ASSERT(erts_thr_progress_has_passed__(value,
1098 					  tpd->wakeup_request[wix]));
1099 
1100 
1101     if (tpd->confirmed == value) {
1102 	/*
1103 	 * We have already confirmed this value. We need to request
1104 	 * wakeup for a value later than our latest confirmed value in
1105 	 * order to prevent progress from reaching the requested value
1106 	 * while we are writing the request.
1107 	 *
1108 	 * It is ok to move the wakeup request forward since the only
1109 	 * guarantee we make (and can make) is that the thread will be
1110 	 * woken some time *after* the requested value has been reached.
1111 	 */
1112 	value++;
1113 	if (value == ERTS_THR_PRGR_VAL_WAITING)
1114 	    value = 0;
1115 
1116 	wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1117 	if (tpd->wakeup_request[wix] == value)
1118 	    return; /* Already got a request registered */
1119 
1120 	ASSERT(erts_thr_progress_has_passed__(value,
1121 					      tpd->wakeup_request[wix]));
1122     }
1123 
1124     tpd->wakeup_request[wix] = value;
1125 
1126     mwd = intrnl->managed.data[wix];
1127 
1128     ix = erts_atomic32_inc_read_nob(&mwd->len) - 1;
1129 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1130     if (ix >= intrnl->managed.no)
1131 	erts_exit(ERTS_ABORT_EXIT, "Internal error: Too many wakeup requests\n");
1132 #endif
1133     mwd->id[ix] = tpd->id;
1134 
1135     ASSERT(!erts_thr_progress_has_reached(value));
1136 
1137     /*
1138      * This thread is guarranteed to issue a full memory barrier:
1139      * - after the request has been written, but
1140      * - before the global thread progress reach the (possibly
1141      *   increased) requested wakeup value.
1142      */
1143 }
1144 
1145 static void
request_wakeup_unmanaged(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1146 request_wakeup_unmanaged(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1147 {
1148     int wix, ix, id, bit;
1149     ErtsThrPrgrUnmanagedWakeupData *umwd;
1150 
1151     ASSERT(!tpd->is_managed);
1152 
1153     /*
1154      * Thread progress *can* reach and pass our requested value while
1155      * we are writing the request.
1156      */
1157 
1158     if (has_reached_wakeup(value)) {
1159 	wakeup_unmanaged(tpd->id);
1160 	return;
1161     }
1162 
1163     wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1164 
1165     if (tpd->wakeup_request[wix] == value)
1166 	return; /* Already got a request registered */
1167 
1168     ASSERT(erts_thr_progress_has_passed__(value,
1169 					  tpd->wakeup_request[wix]));
1170 
1171     umwd = intrnl->unmanaged.data[wix];
1172 
1173     id = tpd->id;
1174 
1175     bit = id & ERTS_THR_PRGR_BM_MASK;
1176     ix = id >> ERTS_THR_PRGR_BM_SHIFT;
1177     ASSERT(0 <= ix && ix < umwd->low_sz);
1178     erts_atomic32_read_bor_nob(&umwd->low[ix], 1 << bit);
1179 
1180     bit = ix & ERTS_THR_PRGR_BM_MASK;
1181     ix >>= ERTS_THR_PRGR_BM_SHIFT;
1182     ASSERT(0 <= ix && ix < umwd->high_sz);
1183     erts_atomic32_read_bor_nob(&umwd->high[ix], 1 << bit);
1184 
1185     erts_atomic32_inc_mb(&umwd->len);
1186 
1187     if (erts_thr_progress_has_reached(value))
1188 	wakeup_unmanaged(tpd->id);
1189     else
1190 	tpd->wakeup_request[wix] = value;
1191 }
1192 
1193 void
erts_thr_progress_wakeup(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1194 erts_thr_progress_wakeup(ErtsThrPrgrData *tpd,
1195 			 ErtsThrPrgrVal value)
1196 {
1197 
1198     ASSERT(!tpd->is_temporary);
1199     if (tpd->is_managed)
1200 	request_wakeup_managed(tpd, value);
1201     else
1202 	request_wakeup_unmanaged(tpd, value);
1203 }
1204 
1205 static void
wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData * umwd)1206 wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData *umwd)
1207 {
1208     int hix;
1209     for (hix = 0; hix < umwd->high_sz; hix++) {
1210 	erts_aint32_t hmask = erts_atomic32_read_nob(&umwd->high[hix]);
1211 	if (hmask) {
1212 	    int hbase = hix << ERTS_THR_PRGR_BM_SHIFT;
1213 	    int hbit;
1214 	    for (hbit = 0; hbit < ERTS_THR_PRGR_BM_BITS; hbit++) {
1215 		if (hmask & (1U << hbit)) {
1216 		    erts_aint_t lmask;
1217 		    int lix = hbase + hbit;
1218 		    ASSERT(0 <= lix && lix < umwd->low_sz);
1219 		    lmask = erts_atomic32_read_nob(&umwd->low[lix]);
1220 		    if (lmask) {
1221 			int lbase = lix << ERTS_THR_PRGR_BM_SHIFT;
1222 			int lbit;
1223 			for (lbit = 0; lbit < ERTS_THR_PRGR_BM_BITS; lbit++) {
1224 			    if (lmask & (1U << lbit)) {
1225 				int id = lbase + lbit;
1226 				wakeup_unmanaged(id);
1227 			    }
1228 			}
1229 			erts_atomic32_set_nob(&umwd->low[lix], 0);
1230 		    }
1231 		}
1232 	    }
1233 	    erts_atomic32_set_nob(&umwd->high[hix], 0);
1234 	}
1235     }
1236 }
1237 
1238 
1239 static void
handle_wakeup_requests(ErtsThrPrgrVal current)1240 handle_wakeup_requests(ErtsThrPrgrVal current)
1241 {
1242     ErtsThrPrgrManagedWakeupData *mwd;
1243     ErtsThrPrgrUnmanagedWakeupData *umwd;
1244     int wix, len, i;
1245 
1246     wix = ERTS_THR_PRGR_WAKEUP_IX(current);
1247 
1248     mwd = intrnl->managed.data[wix];
1249     len = erts_atomic32_read_nob(&mwd->len);
1250     ASSERT(len >= 0);
1251     if (len) {
1252 	for (i = 0; i < len; i++)
1253 	    wakeup_managed(mwd->id[i]);
1254 	erts_atomic32_set_nob(&mwd->len, 0);
1255     }
1256 
1257     umwd = intrnl->unmanaged.data[wix];
1258     len = erts_atomic32_read_nob(&umwd->len);
1259     ASSERT(len >= 0);
1260     if (len) {
1261 	wakeup_unmanaged_threads(umwd);
1262 	erts_atomic32_set_nob(&umwd->len, 0);
1263     }
1264 
1265 }
1266 
1267 static int
got_sched_wakeups(void)1268 got_sched_wakeups(void)
1269 {
1270     int wix;
1271 
1272     ERTS_THR_MEMORY_BARRIER;
1273 
1274     for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1275  	ErtsThrPrgrManagedWakeupData **mwd = intrnl->managed.data;
1276 	if (erts_atomic32_read_nob(&mwd[wix]->len))
1277 	    return 1;
1278     }
1279     for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1280  	ErtsThrPrgrUnmanagedWakeupData **umwd = intrnl->unmanaged.data;
1281 	if (erts_atomic32_read_nob(&umwd[wix]->len))
1282 	    return 1;
1283     }
1284     return 0;
1285 }
1286 
1287 static erts_aint32_t
block_thread(ErtsThrPrgrData * tpd)1288 block_thread(ErtsThrPrgrData *tpd)
1289 {
1290     erts_aint32_t lflgs;
1291     ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
1292 
1293     do {
1294 	block_count_dec();
1295 
1296 	while (1) {
1297 	    cbp->prepare_wait(cbp->arg);
1298 	    lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
1299 	    if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1300 		cbp->wait(cbp->arg);
1301 	    else
1302 		break;
1303 	}
1304 
1305     } while (block_count_inc());
1306 
1307     cbp->finalize_wait(cbp->arg);
1308 
1309     return lflgs;
1310 }
1311 
1312 static erts_aint32_t
thr_progress_block(ErtsThrPrgrData * tpd,int wait)1313 thr_progress_block(ErtsThrPrgrData *tpd, int wait)
1314 {
1315     erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
1316     erts_aint32_t lflgs, bc;
1317 
1318     if (tpd->is_blocking++)
1319 	return (erts_aint32_t) 0;
1320 
1321     while (1) {
1322 	lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
1323 					   ERTS_THR_PRGR_LFLG_BLOCK);
1324 	if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1325 	    block_thread(tpd);
1326 	else
1327 	    break;
1328     }
1329 
1330 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1331     erts_fprintf(stderr, "block(%d)\n", tpd->id);
1332 #endif
1333 
1334     ASSERT(ERTS_AINT_NULL
1335 	   == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1336 
1337     if (wait) {
1338 	event = erts_tse_fetch();
1339 	erts_tse_reset(event);
1340 	erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
1341 			    (erts_aint_t) event);
1342     }
1343     if (tpd->is_managed)
1344 	erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
1345     bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
1346 				    ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1347     bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
1348     if (wait) {
1349 	while (bc != 0) {
1350 	    erts_tse_wait(event);
1351 	    erts_tse_reset(event);
1352 	    bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1353 	}
1354     }
1355 
1356     /* tse event returned in erts_thr_progress_unblock() */
1357     return bc;
1358 
1359 }
1360 
1361 void
erts_thr_progress_block(void)1362 erts_thr_progress_block(void)
1363 {
1364     thr_progress_block(tmp_thr_prgr_data(NULL), 1);
1365 }
1366 
1367 int
erts_thr_progress_fatal_error_block(ErtsThrPrgrData * tmp_tpd_bufp)1368 erts_thr_progress_fatal_error_block(ErtsThrPrgrData *tmp_tpd_bufp)
1369 {
1370     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1371 
1372     if (!tpd) {
1373 	/*
1374 	 * We stack allocate since failure to allocate memory may
1375 	 * have caused the problem in the first place. This is ok
1376 	 * since we never complete an unblock after a fatal error
1377 	 * block.
1378 	 */
1379 	tpd = tmp_tpd_bufp;
1380 	init_tmp_thr_prgr_data(tpd);
1381     }
1382 
1383     /* Returns number of threads that have not yes been blocked */
1384     return thr_progress_block(tpd, 0);
1385 }
1386 
1387 void
erts_thr_progress_fatal_error_wait(SWord timeout)1388 erts_thr_progress_fatal_error_wait(SWord timeout) {
1389     erts_aint32_t bc;
1390     SWord time_left = timeout;
1391     ErtsMonotonicTime timeout_time;
1392     ErtsSchedulerData *esdp = erts_get_scheduler_data();
1393 
1394     /*
1395      * Counting poll intervals may give us a too long timeout
1396      * if cpu is busy. We use timeout time to try to prevent
1397      * this. In case we havn't got time correction this may
1398      * however fail too...
1399      */
1400     timeout_time = erts_get_monotonic_time(esdp);
1401     timeout_time += ERTS_MSEC_TO_MONOTONIC((ErtsMonotonicTime) timeout);
1402 
1403     while (1) {
1404 	if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
1405 	    time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
1406 	bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1407 	if (bc == 0)
1408 	    break; /* Succefully blocked all managed threads */
1409 	if (time_left <= 0)
1410 	    break; /* Timeout */
1411 	if (timeout_time <= erts_get_monotonic_time(esdp))
1412 	    break; /* Timeout */
1413     }
1414 }
1415 
1416 void
erts_thr_progress_unblock(void)1417 erts_thr_progress_unblock(void)
1418 {
1419     erts_tse_t *event;
1420     int id, break_id, sz, wakeup;
1421     ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
1422 
1423     ASSERT(tpd->is_blocking);
1424     if (--tpd->is_blocking)
1425 	return;
1426 
1427     sz = intrnl->managed.no;
1428 
1429     wakeup = 1;
1430     if (!tpd->is_managed)
1431 	id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
1432     else {
1433 	break_id = tpd->id;
1434 	id = break_id + 1;
1435 	if (id >= sz)
1436 	    id = 0;
1437 	if (id == break_id)
1438 	    wakeup = 0;
1439 	erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
1440     }
1441 
1442     event = ((erts_tse_t *)
1443 	     erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1444     ASSERT(event);
1445     erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
1446 
1447     erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
1448 				ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1449 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1450     erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
1451 #endif
1452     erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
1453 			       ~ERTS_THR_PRGR_LFLG_BLOCK);
1454 
1455     if (wakeup) {
1456 	do {
1457 	    ErtsThrPrgrVal tmp;
1458 	    tmp = read_nob(&intrnl->thr[id].data.current);
1459 	    if (tmp != ERTS_THR_PRGR_VAL_WAITING)
1460 		wakeup_managed(id);
1461 	    if (++id >= sz)
1462 		id = 0;
1463 	} while (id != break_id);
1464     }
1465 
1466     return_tmp_thr_prgr_data(tpd);
1467     erts_tse_return(event);
1468 }
1469 
1470 int
erts_thr_progress_is_blocking(void)1471 erts_thr_progress_is_blocking(void)
1472 {
1473     ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1474     return tpd && tpd->is_blocking;
1475 }
1476 
erts_thr_progress_dbg_print_state(void)1477 void erts_thr_progress_dbg_print_state(void)
1478 {
1479     int id;
1480     int sz = intrnl->managed.no;
1481 
1482     erts_fprintf(stderr, "--- thread progress ---\n");
1483     erts_fprintf(stderr,"current=%b64u\n", erts_thr_progress_current());
1484     for (id = 0; id < sz; id++) {
1485 	ErtsThrPrgrVal current = read_nob(&intrnl->thr[id].data.current);
1486 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1487 	erts_aint32_t state_debug;
1488 	char *active, *leader;
1489 
1490 	state_debug = erts_atomic32_read_nob(&intrnl->thr[id].data.state_debug);
1491 	active = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE
1492 		  ? "true"
1493 		  : "false");
1494 	leader = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_LEADER
1495 		  ? "true"
1496 		  : "false");
1497 #endif
1498 	if (current == ERTS_THR_PRGR_VAL_WAITING)
1499 	    erts_fprintf(stderr,
1500 			 "  id=%d, current=WAITING"
1501 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1502 			 ", active=%s, leader=%s"
1503 #endif
1504 			 "\n", id
1505 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1506 			 , active, leader
1507 #endif
1508 		);
1509 	else
1510 	    erts_fprintf(stderr,
1511 			 "  id=%d, current=%b64u"
1512 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1513 			 ", active=%s, leader=%s"
1514 #endif
1515 			 "\n", id, current
1516 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1517 			 , active, leader
1518 #endif
1519 		);
1520     }
1521     erts_fprintf(stderr, "-----------------------\n");
1522 
1523 
1524 }
1525 
1526