1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Thread progress information. Used by lock free algorithms
23 * to determine when all involved threads are guaranteed to
24 * have passed a specific point of execution.
25 *
26 * Usage instructions below.
27 *
28 * Author: Rickard Green
29 */
30
31 /*
32 * ------ Usage instructions -----------------------------------------------
33 *
34 * This module keeps track of the progress of a set of managed threads. Only
35 * threads that behave well can be allowed to be managed. A managed thread
36 * should update its thread progress frequently. Currently only scheduler
37 * threads, the system-message-dispatcher threads, and the aux-thread are
38 * managed threads. We typically do not want any async threads as managed
39 * threads since they cannot guarantee a frequent update of thread progress,
40 * since they execute user implemented driver code that is assumed to be
41 * time consuming.
42 *
43 * erts_thr_progress_current() returns the global current thread progress
44 * value of managed threads. I.e., the latest progress value that all
45 * managed threads have reached. Thread progress values are opaque.
46 *
47 * erts_thr_progress_has_reached(VAL) returns a value != 0 if current
48 * global thread progress has reached or passed VAL.
49 *
50 * erts_thr_progress_later() returns a thread progress value in the future
51 * which no managed thread have yet reached.
52 *
53 * All threads issue a full memory barrier when reaching a new thread
54 * progress value. They only reach new thread progress values in specific
55 * controlled states when calling erts_thr_progress_update(). Schedulers
56 * call erts_thr_progress_update() in between execution of processes,
57 * when going to sleep and when waking up.
58 *
59 * Sleeping managed threads are considered to have reached next thread
60 * progress value immediately. They are not woken and do therefore not
61 * issue any memory barriers when reaching a new thread progress value.
62 * A sleeping thread do however immediately issue a memory barrier upon
63 * wakeup.
64 *
65 * Both managed and registered unmanaged threads may request wakeup when
66 * the global thread progress reach a certain value using
67 * erts_thr_progress_wakeup().
68 *
69 * Note that thread progress values are opaque, and that you are only
70 * allowed to use thread progress values retrieved from this API!
71 *
72 * -------------------------------------------------------------------------
73 */
74
75 #ifdef HAVE_CONFIG_H
76 # include "config.h"
77 #endif
78
79 #include <stddef.h> /* offsetof() */
80 #include "erl_thr_progress.h"
81 #include "global.h"
82
83
84 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 0
85
86 #ifdef DEBUG
87 #undef ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
88 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 1
89 #endif
90
91 #define ERTS_THR_PRGR_PRINT_LEADER 0
92 #define ERTS_THR_PRGR_PRINT_VAL 0
93 #define ERTS_THR_PRGR_PRINT_BLOCKERS 0
94
95 #define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
96
97 #define ERTS_THR_PRGR_LFLG_BLOCK ((erts_aint32_t) (1U << 31))
98 #define ERTS_THR_PRGR_LFLG_NO_LEADER ((erts_aint32_t) (1U << 30))
99 #define ERTS_THR_PRGR_LFLG_WAITING_UM ((erts_aint32_t) (1U << 29))
100 #define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
101 | ERTS_THR_PRGR_LFLG_BLOCK \
102 | ERTS_THR_PRGR_LFLG_WAITING_UM))
103
104 #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
105 ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
106
107 /*
108 * We use a 64-bit value for thread progress. By this wrapping of
109 * the thread progress will more or less never occur.
110 *
111 * On 32-bit systems we therefore need a double word atomic.
112 */
113 #undef read_acqb
114 #define read_acqb erts_thr_prgr_read_acqb__
115 #undef read_nob
116 #define read_nob erts_thr_prgr_read_nob__
117
118 static ERTS_INLINE void
set_mb(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)119 set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
120 {
121 erts_atomic64_set_mb(atmc, (erts_aint64_t) val);
122 }
123
124 static ERTS_INLINE void
set_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)125 set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
126 {
127 erts_atomic64_set_nob(atmc, (erts_aint64_t) val);
128 }
129
130 static ERTS_INLINE void
init_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)131 init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
132 {
133 erts_atomic64_init_nob(atmc, (erts_aint64_t) val);
134 }
135
136 /* #define ERTS_THR_PROGRESS_STATE_DEBUG */
137
138 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
139
140 #ifdef __GNUC__
141 #warning "Thread progress state debug is on"
142 #endif
143
144 #define ERTS_THR_PROGRESS_STATE_DEBUG_LEADER ((erts_aint32_t) (1U << 0))
145 #define ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE ((erts_aint32_t) (1U << 1))
146
147 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID) \
148 erts_atomic32_init_nob(&intrnl->thr[(ID)].data.state_debug, \
149 ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE)
150
151 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON) \
152 do { \
153 erts_aint32_t state_debug__; \
154 state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \
155 if ((ON)) \
156 state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \
157 else \
158 state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \
159 erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \
160 } while (0)
161
162 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON) \
163 do { \
164 erts_aint32_t state_debug__; \
165 state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \
166 if ((ON)) \
167 state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \
168 else \
169 state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \
170 erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \
171 } while (0)
172
173 #else
174
175 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)
176 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)
177 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)
178
179 #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
180
181 #define ERTS_THR_PRGR_BLCKR_INVALID ((erts_aint32_t) (~0U))
182 #define ERTS_THR_PRGR_BLCKR_UNMANAGED ((erts_aint32_t) (1U << 31))
183
184 #define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING ((erts_aint32_t) (1U << 31))
185
186 #define ERTS_THR_PRGR_BM_BITS 32
187 #define ERTS_THR_PRGR_BM_SHIFT 5
188 #define ERTS_THR_PRGR_BM_MASK 0x1f
189
190 #define ERTS_THR_PRGR_WAKEUP_DATA_MASK (ERTS_THR_PRGR_WAKEUP_DATA_SIZE - 1)
191
192 #define ERTS_THR_PRGR_WAKEUP_IX(V) \
193 ((int) ((V) & ERTS_THR_PRGR_WAKEUP_DATA_MASK))
194
195 typedef struct {
196 erts_atomic32_t len;
197 int id[1];
198 } ErtsThrPrgrManagedWakeupData;
199
200 typedef struct {
201 erts_atomic32_t len;
202 int high_sz;
203 int low_sz;
204 erts_atomic32_t *high;
205 erts_atomic32_t *low;
206 } ErtsThrPrgrUnmanagedWakeupData;
207
208 typedef struct {
209 erts_atomic32_t lflgs;
210 erts_atomic32_t block_count;
211 erts_atomic_t blocker_event;
212 erts_atomic32_t pref_wakeup_used;
213 erts_atomic32_t managed_count;
214 erts_atomic32_t managed_id;
215 erts_atomic32_t unmanaged_id;
216 int chk_next_ix;
217 struct {
218 int waiting;
219 erts_atomic32_t current;
220 } umrefc_ix;
221 } ErtsThrPrgrMiscData;
222
223 typedef struct {
224 ERTS_THR_PRGR_ATOMIC current;
225 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
226 erts_atomic32_t state_debug;
227 #endif
228 } ErtsThrPrgrElement;
229
230 typedef union {
231 ErtsThrPrgrElement data;
232 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))];
233 } ErtsThrPrgrArray;
234
235 typedef union {
236 erts_atomic_t refc;
237 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))];
238 } ErtsThrPrgrUnmanagedRefc;
239
240 typedef struct {
241 union {
242 ErtsThrPrgrMiscData data;
243 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
244 sizeof(ErtsThrPrgrMiscData))];
245 } misc;
246 ErtsThrPrgrUnmanagedRefc umrefc[2];
247 ErtsThrPrgrArray *thr;
248 struct {
249 int no;
250 ErtsThrPrgrCallbacks *callbacks;
251 ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
252 } managed;
253 struct {
254 int no;
255 ErtsThrPrgrCallbacks *callbacks;
256 ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
257 } unmanaged;
258 } ErtsThrPrgrInternalData;
259
260 static ErtsThrPrgrInternalData *intrnl;
261
262 ErtsThrPrgr erts_thr_prgr__;
263
264 erts_tsd_key_t erts_thr_prgr_data_key__;
265
266 static void handle_wakeup_requests(ErtsThrPrgrVal current);
267 static int got_sched_wakeups(void);
268 static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
269
270 static ERTS_INLINE void
wakeup_managed(int id)271 wakeup_managed(int id)
272 {
273 ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
274 ASSERT(0 <= id && id < intrnl->managed.no);
275 cbp->wakeup(cbp->arg);
276 }
277
278
279 static ERTS_INLINE void
wakeup_unmanaged(int id)280 wakeup_unmanaged(int id)
281 {
282 ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
283 ASSERT(0 <= id && id < intrnl->unmanaged.no);
284 cbp->wakeup(cbp->arg);
285 }
286
287 static ERTS_INLINE ErtsThrPrgrData *
perhaps_thr_prgr_data(ErtsSchedulerData * esdp)288 perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
289 {
290 if (esdp)
291 return &esdp->thr_progress_data;
292 else
293 return erts_tsd_get(erts_thr_prgr_data_key__);
294 }
295
296 static ERTS_INLINE ErtsThrPrgrData *
thr_prgr_data(ErtsSchedulerData * esdp)297 thr_prgr_data(ErtsSchedulerData *esdp)
298 {
299 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
300 ASSERT(tpd);
301 return tpd;
302 }
303
304 static void
init_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)305 init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
306 {
307 tpd->id = -1;
308 tpd->is_managed = 0;
309 tpd->is_blocking = 0;
310 tpd->is_temporary = 1;
311 #ifdef ERTS_ENABLE_LOCK_CHECK
312 tpd->is_delaying = 0;
313 #endif
314 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
315 }
316
317 static ERTS_INLINE ErtsThrPrgrData *
tmp_thr_prgr_data(ErtsSchedulerData * esdp)318 tmp_thr_prgr_data(ErtsSchedulerData *esdp)
319 {
320 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
321
322 if (!tpd) {
323 /*
324 * We only allocate the part up to the wakeup_request field which is
325 * the first field only used by registered threads
326 */
327 size_t alloc_size = offsetof(ErtsThrPrgrData, wakeup_request);
328
329 /* We may land here as a result of unmanaged_delay being called from
330 * the lock counting module, which in turn might be called from within
331 * the allocator, so we use plain malloc to avoid deadlocks. */
332 tpd =
333 #ifdef ERTS_ENABLE_LOCK_COUNT
334 malloc(alloc_size);
335 #else
336 erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, alloc_size);
337 #endif
338
339 init_tmp_thr_prgr_data(tpd);
340 }
341
342 return tpd;
343 }
344
345 static ERTS_INLINE void
return_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)346 return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
347 {
348 if (tpd->is_temporary) {
349 erts_tsd_set(erts_thr_prgr_data_key__, NULL);
350
351 #ifdef ERTS_ENABLE_LOCK_COUNT
352 free(tpd);
353 #else
354 erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
355 #endif
356 }
357 }
358
359 static ERTS_INLINE int
block_count_dec(void)360 block_count_dec(void)
361 {
362 erts_aint32_t block_count;
363 block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
364 if (block_count == 0) {
365 erts_tse_t *event;
366 event = ((erts_tse_t*)
367 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
368 if (event)
369 erts_tse_set(event);
370 return 1;
371 }
372
373 return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
374 }
375
376 static ERTS_INLINE int
block_count_inc(void)377 block_count_inc(void)
378 {
379 erts_aint32_t block_count;
380 block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
381 return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
382 }
383
384
385 void
erts_thr_progress_pre_init(void)386 erts_thr_progress_pre_init(void)
387 {
388 intrnl = NULL;
389 erts_tsd_key_create(&erts_thr_prgr_data_key__,
390 "erts_thr_prgr_data_key");
391 init_nob(&erts_thr_prgr__.current, ERTS_THR_PRGR_VAL_FIRST);
392 }
393
394 void
erts_thr_progress_init(int no_schedulers,int managed,int unmanaged)395 erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
396 {
397 int i, j, um_low, um_high;
398 char *ptr;
399 size_t cb_sz, intrnl_sz, thr_arr_sz, m_wakeup_size, um_wakeup_size,
400 tot_size;
401
402 intrnl_sz = sizeof(ErtsThrPrgrInternalData);
403 intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
404
405 cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
406 cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
407
408 thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
409 ASSERT(thr_arr_sz == ERTS_ALC_CACHE_LINE_ALIGN_SIZE(thr_arr_sz));
410
411 m_wakeup_size = sizeof(ErtsThrPrgrManagedWakeupData);
412 m_wakeup_size += (managed - 1)*sizeof(int);
413 m_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(m_wakeup_size);
414
415 um_low = (unmanaged - 1)/ERTS_THR_PRGR_BM_BITS + 1;
416 um_high = (um_low - 1)/ERTS_THR_PRGR_BM_BITS + 1;
417
418 um_wakeup_size = sizeof(ErtsThrPrgrUnmanagedWakeupData);
419 um_wakeup_size += (um_high + um_low)*sizeof(erts_atomic32_t);
420 um_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(um_wakeup_size);
421
422 tot_size = intrnl_sz;
423 tot_size += cb_sz;
424 tot_size += thr_arr_sz;
425 tot_size += m_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
426 tot_size += um_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
427
428 ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_THR_PRGR_IDATA,
429 tot_size);
430
431 intrnl = (ErtsThrPrgrInternalData *) ptr;
432 ptr += intrnl_sz;
433
434 erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
435 ERTS_THR_PRGR_LFLG_NO_LEADER);
436 erts_atomic32_init_nob(&intrnl->misc.data.block_count,
437 (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
438 | (erts_aint32_t) managed));
439 erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
440 erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
441 erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
442 erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
443 erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
444 intrnl->misc.data.chk_next_ix = 0;
445 intrnl->misc.data.umrefc_ix.waiting = -1;
446 erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0);
447
448 erts_atomic_init_nob(&intrnl->umrefc[0].refc, (erts_aint_t) 0);
449 erts_atomic_init_nob(&intrnl->umrefc[1].refc, (erts_aint_t) 0);
450
451 intrnl->thr = (ErtsThrPrgrArray *) ptr;
452 ptr += thr_arr_sz;
453 for (i = 0; i < managed; i++)
454 init_nob(&intrnl->thr[i].data.current, 0);
455
456 intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
457 intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
458 ptr += cb_sz;
459
460 intrnl->managed.no = managed;
461 for (i = 0; i < managed; i++) {
462 intrnl->managed.callbacks[i].arg = NULL;
463 intrnl->managed.callbacks[i].wakeup = NULL;
464 }
465
466 intrnl->unmanaged.no = unmanaged;
467 for (i = 0; i < unmanaged; i++) {
468 intrnl->unmanaged.callbacks[i].arg = NULL;
469 intrnl->unmanaged.callbacks[i].wakeup = NULL;
470 }
471
472 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
473 intrnl->managed.data[i] = (ErtsThrPrgrManagedWakeupData *) ptr;
474 erts_atomic32_init_nob(&intrnl->managed.data[i]->len, 0);
475 ptr += m_wakeup_size;
476 }
477
478 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
479 erts_atomic32_t *bm;
480 intrnl->unmanaged.data[i] = (ErtsThrPrgrUnmanagedWakeupData *) ptr;
481 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->len, 0);
482 bm = (erts_atomic32_t *) (ptr + sizeof(ErtsThrPrgrUnmanagedWakeupData));
483 intrnl->unmanaged.data[i]->high = bm;
484 intrnl->unmanaged.data[i]->high_sz = um_high;
485 for (j = 0; j < um_high; j++)
486 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->high[j], 0);
487 intrnl->unmanaged.data[i]->low
488 = &intrnl->unmanaged.data[i]->high[um_high];
489 intrnl->unmanaged.data[i]->low_sz = um_low;
490 for (j = 0; j < um_low; j++)
491 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->low[j], 0);
492 ptr += um_wakeup_size;
493 }
494 ERTS_THR_MEMORY_BARRIER;
495 }
496
497 static void
init_wakeup_request_array(ErtsThrPrgrVal * w)498 init_wakeup_request_array(ErtsThrPrgrVal *w)
499 {
500 int i;
501 ErtsThrPrgrVal current;
502
503 current = read_acqb(&erts_thr_prgr__.current);
504 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
505 w[i] = current - ((ErtsThrPrgrVal) (ERTS_THR_PRGR_WAKEUP_DATA_SIZE + i));
506 if (w[i] > current)
507 w[i]--;
508 }
509 }
510
erts_thr_progress_data(void)511 ErtsThrPrgrData *erts_thr_progress_data(void) {
512 return erts_tsd_get(erts_thr_prgr_data_key__);
513 }
514
515 void
erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks * callbacks)516 erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
517 {
518 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
519 int is_blocking = 0;
520
521 if (tpd) {
522 if (!tpd->is_temporary)
523 erts_exit(ERTS_ABORT_EXIT,
524 "%s:%d:%s(): Double register of thread\n",
525 __FILE__, __LINE__, __func__);
526 is_blocking = tpd->is_blocking;
527 return_tmp_thr_prgr_data(tpd);
528 }
529
530 /*
531 * We only allocate the part up to the leader field
532 * which is the first field only used by managed threads
533 */
534 tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
535 offsetof(ErtsThrPrgrData, leader));
536 tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
537 tpd->is_managed = 0;
538 tpd->is_blocking = is_blocking;
539 tpd->is_temporary = 0;
540 #ifdef ERTS_ENABLE_LOCK_CHECK
541 tpd->is_delaying = 0;
542 #endif
543 ASSERT(tpd->id >= 0);
544 if (tpd->id >= intrnl->unmanaged.no)
545 erts_exit(ERTS_ABORT_EXIT,
546 "%s:%d:%s(): Too many unmanaged registered threads\n",
547 __FILE__, __LINE__, __func__);
548
549 init_wakeup_request_array(&tpd->wakeup_request[0]);
550 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
551
552 ASSERT(callbacks->wakeup);
553
554 intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
555 }
556
557
558 ErtsThrPrgrData *
erts_thr_progress_register_managed_thread(ErtsSchedulerData * esdp,ErtsThrPrgrCallbacks * callbacks,int pref_wakeup,int deep_sleeper)559 erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
560 ErtsThrPrgrCallbacks *callbacks,
561 int pref_wakeup,
562 int deep_sleeper)
563 {
564 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
565 int is_blocking = 0, managed;
566
567 if (tpd) {
568 if (!tpd->is_temporary)
569 erts_exit(ERTS_ABORT_EXIT,
570 "%s:%d:%s(): Double register of thread\n",
571 __FILE__, __LINE__, __func__);
572 is_blocking = tpd->is_blocking;
573 return_tmp_thr_prgr_data(tpd);
574 }
575
576 if (esdp)
577 tpd = &esdp->thr_progress_data;
578 else
579 tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
580
581 if (pref_wakeup
582 && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
583 tpd->id = 0;
584 else if (esdp)
585 tpd->id = (int) esdp->no;
586 else
587 tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
588 ASSERT(tpd->id >= 0);
589 if (tpd->id >= intrnl->managed.no)
590 erts_exit(ERTS_ABORT_EXIT,
591 "%s:%d:%s(): Too many managed registered threads\n",
592 __FILE__, __LINE__, __func__);
593
594 tpd->is_managed = 1;
595 tpd->is_blocking = is_blocking;
596 tpd->is_temporary = 0;
597 tpd->is_deep_sleeper = deep_sleeper;
598 #ifdef ERTS_ENABLE_LOCK_CHECK
599 tpd->is_delaying = 1;
600 #endif
601
602 init_wakeup_request_array(&tpd->wakeup_request[0]);
603
604 ERTS_THR_PROGRESS_STATE_DEBUG_INIT(tpd->id);
605
606 tpd->leader = 0;
607 tpd->active = 1;
608 tpd->confirmed = 0;
609 tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
610 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
611
612 erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
613
614 ASSERT(callbacks->wakeup);
615 ASSERT(callbacks->prepare_wait);
616 ASSERT(callbacks->wait);
617 ASSERT(callbacks->finalize_wait);
618
619 intrnl->managed.callbacks[tpd->id] = *callbacks;
620
621 callbacks->prepare_wait(callbacks->arg);
622 managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
623 if (managed != intrnl->managed.no) {
624 /* Wait until all managed threads have registered... */
625 do {
626 callbacks->wait(callbacks->arg);
627 callbacks->prepare_wait(callbacks->arg);
628 managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
629 } while (managed != intrnl->managed.no);
630 }
631 else {
632 int id;
633 /* All managed threads have registered; lets go... */
634 for (id = 0; id < managed; id++)
635 if (id != tpd->id)
636 wakeup_managed(id);
637 }
638 callbacks->finalize_wait(callbacks->arg);
639 return tpd;
640 }
641
642 static ERTS_INLINE int
leader_update(ErtsThrPrgrData * tpd)643 leader_update(ErtsThrPrgrData *tpd)
644 {
645 #ifdef ERTS_ENABLE_LOCK_CHECK
646 erts_lc_check_exact(NULL, 0);
647 #endif
648 if (!tpd->leader) {
649 /* Probably need to block... */
650 block_thread(tpd);
651 }
652 else {
653 ErtsThrPrgrVal current;
654 int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged;
655 erts_aint32_t lflgs;
656 ErtsThrPrgrVal next;
657 erts_aint_t refc;
658
659 my_ix = tpd->id;
660
661 if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) {
662 /* Took over as leader from another thread */
663 tpd->leader_state.current = read_nob(&erts_thr_prgr__.current);
664 tpd->leader_state.next = tpd->leader_state.current;
665 tpd->leader_state.next++;
666 if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING)
667 tpd->leader_state.next = 0;
668 tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix;
669 tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting;
670 tpd->leader_state.umrefc_ix.current =
671 (int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current);
672
673 if (tpd->confirmed == tpd->leader_state.current) {
674 ErtsThrPrgrVal val = tpd->leader_state.current + 1;
675 if (val == ERTS_THR_PRGR_VAL_WAITING)
676 val = 0;
677 tpd->confirmed = val;
678 set_mb(&intrnl->thr[my_ix].data.current, val);
679 }
680 }
681
682
683 next = tpd->leader_state.next;
684
685 waiting_unmanaged = 0;
686 umrefc_ix = -1; /* Shut up annoying warning */
687
688 chk_next_ix = tpd->leader_state.chk_next_ix;
689 no_managed = intrnl->managed.no;
690 ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed);
691 /* Check manged threads */
692 if (chk_next_ix < no_managed) {
693 for (ix = chk_next_ix; ix < no_managed; ix++) {
694 ErtsThrPrgrVal tmp;
695 if (ix == my_ix)
696 continue;
697 tmp = read_nob(&intrnl->thr[ix].data.current);
698 if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
699 tpd->leader_state.chk_next_ix = ix;
700 ASSERT(erts_thr_progress_has_passed__(next, tmp));
701 goto done;
702 }
703 }
704 }
705
706 /* Check unmanged threads */
707 waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1;
708 umrefc_ix = (waiting_unmanaged
709 ? tpd->leader_state.umrefc_ix.waiting
710 : tpd->leader_state.umrefc_ix.current);
711 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
712 ASSERT(refc >= 0);
713 if (refc != 0) {
714 int new_umrefc_ix;
715
716 if (waiting_unmanaged)
717 goto done;
718
719 new_umrefc_ix = (umrefc_ix + 1) & 0x1;
720 tpd->leader_state.umrefc_ix.waiting = umrefc_ix;
721 tpd->leader_state.chk_next_ix = no_managed;
722 erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current,
723 (erts_aint32_t) new_umrefc_ix);
724 tpd->leader_state.umrefc_ix.current = new_umrefc_ix;
725 ETHR_MEMBAR(ETHR_StoreLoad);
726 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
727 ASSERT(refc >= 0);
728 waiting_unmanaged = 1;
729 if (refc != 0)
730 goto done;
731 }
732
733 /* Make progress */
734 current = next;
735
736 next++;
737 if (next == ERTS_THR_PRGR_VAL_WAITING)
738 next = 0;
739
740 set_nob(&intrnl->thr[my_ix].data.current, next);
741 set_mb(&erts_thr_prgr__.current, current);
742 tpd->confirmed = next;
743 tpd->leader_state.next = next;
744 tpd->leader_state.current = current;
745
746 #if ERTS_THR_PRGR_PRINT_VAL
747 if (current % 1000 == 0)
748 erts_fprintf(stderr, "%b64u\n", current);
749 #endif
750 handle_wakeup_requests(current);
751
752 if (waiting_unmanaged) {
753 waiting_unmanaged = 0;
754 tpd->leader_state.umrefc_ix.waiting = -1;
755 erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs,
756 ~ERTS_THR_PRGR_LFLG_WAITING_UM);
757 }
758 tpd->leader_state.chk_next_ix = 0;
759
760 done:
761
762 if (tpd->active) {
763 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
764 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
765 (void) block_thread(tpd);
766 }
767 else {
768 int force_wakeup_check = 0;
769 erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER;
770 tpd->leader = 0;
771 tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
772 #if ERTS_THR_PRGR_PRINT_LEADER
773 erts_fprintf(stderr, "L <- %d\n", tpd->id);
774 #endif
775
776 ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
777
778 intrnl->misc.data.umrefc_ix.waiting
779 = tpd->leader_state.umrefc_ix.waiting;
780 if (waiting_unmanaged)
781 set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM;
782
783 lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
784 set_flags);
785 lflgs |= set_flags;
786 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
787 lflgs = block_thread(tpd);
788
789 if (waiting_unmanaged) {
790 /* Need to check umrefc again */
791 ETHR_MEMBAR(ETHR_StoreLoad);
792 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
793 if (refc == 0) {
794 /* Need to force wakeup check */
795 force_wakeup_check = 1;
796 }
797 }
798
799 if ((force_wakeup_check
800 || ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
801 | ERTS_THR_PRGR_LFLG_WAITING_UM
802 | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
803 == ERTS_THR_PRGR_LFLG_NO_LEADER))
804 && got_sched_wakeups()) {
805 /* Someone need to make progress */
806 wakeup_managed(tpd->id);
807 }
808 }
809 }
810
811 return tpd->leader;
812 }
813
814 static int
update(ErtsThrPrgrData * tpd)815 update(ErtsThrPrgrData *tpd)
816 {
817 int res;
818 ErtsThrPrgrVal val;
819
820 if (tpd->leader)
821 res = 1;
822 else {
823 erts_aint32_t lflgs;
824 res = 0;
825 val = read_acqb(&erts_thr_prgr__.current);
826 if (tpd->confirmed == val) {
827 val++;
828 if (val == ERTS_THR_PRGR_VAL_WAITING)
829 val = 0;
830 tpd->confirmed = val;
831 set_mb(&intrnl->thr[tpd->id].data.current, val);
832 }
833
834 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
835 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
836 res = 1; /* Need to block in leader_update() */
837
838 if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
839 && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
840 /* Try to take over leadership... */
841 erts_aint32_t olflgs;
842 olflgs = erts_atomic32_read_band_acqb(
843 &intrnl->misc.data.lflgs,
844 ~ERTS_THR_PRGR_LFLG_NO_LEADER);
845 if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
846 tpd->leader = 1;
847 #if ERTS_THR_PRGR_PRINT_LEADER
848 erts_fprintf(stderr, "L -> %d\n", tpd->id);
849 #endif
850 ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
851 }
852 }
853 res |= tpd->leader;
854 }
855 return res;
856 }
857
858 int
erts_thr_progress_update(ErtsThrPrgrData * tpd)859 erts_thr_progress_update(ErtsThrPrgrData *tpd)
860 {
861 return update(tpd);
862 }
863
864
865 int
erts_thr_progress_leader_update(ErtsThrPrgrData * tpd)866 erts_thr_progress_leader_update(ErtsThrPrgrData *tpd)
867 {
868 return leader_update(tpd);
869 }
870
871 void
erts_thr_progress_prepare_wait(ErtsThrPrgrData * tpd)872 erts_thr_progress_prepare_wait(ErtsThrPrgrData *tpd)
873 {
874 erts_aint32_t lflgs;
875
876 #ifdef ERTS_ENABLE_LOCK_CHECK
877 erts_lc_check_exact(NULL, 0);
878 #endif
879
880 block_count_dec();
881
882 tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING;
883 set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
884
885 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
886
887 if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
888 | ERTS_THR_PRGR_LFLG_WAITING_UM
889 | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
890 == ERTS_THR_PRGR_LFLG_NO_LEADER
891 && got_sched_wakeups()) {
892 /* Someone need to make progress */
893 if (tpd->is_deep_sleeper)
894 wakeup_managed(1);
895 else
896 wakeup_managed(tpd->id);
897 }
898 }
899
900 void
erts_thr_progress_finalize_wait(ErtsThrPrgrData * tpd)901 erts_thr_progress_finalize_wait(ErtsThrPrgrData *tpd)
902 {
903 ErtsThrPrgrVal current, val;
904
905 #ifdef ERTS_ENABLE_LOCK_CHECK
906 erts_lc_check_exact(NULL, 0);
907 #endif
908
909 /*
910 * We aren't allowed to continue until our thread
911 * progress is past global current.
912 */
913 val = current = read_acqb(&erts_thr_prgr__.current);
914 while (1) {
915 val++;
916 if (val == ERTS_THR_PRGR_VAL_WAITING)
917 val = 0;
918 tpd->confirmed = val;
919 set_mb(&intrnl->thr[tpd->id].data.current, val);
920 val = read_acqb(&erts_thr_prgr__.current);
921 if (current == val)
922 break;
923 current = val;
924 }
925 if (block_count_inc())
926 block_thread(tpd);
927 if (update(tpd))
928 leader_update(tpd);
929 }
930
931 void
erts_thr_progress_active(ErtsThrPrgrData * tpd,int on)932 erts_thr_progress_active(ErtsThrPrgrData *tpd, int on)
933 {
934
935 #ifdef ERTS_ENABLE_LOCK_CHECK
936 erts_lc_check_exact(NULL, 0);
937 #endif
938
939 ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
940
941 if (on) {
942 ASSERT(!tpd->active);
943 tpd->active = 1;
944 erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
945 }
946 else {
947 ASSERT(tpd->active);
948 tpd->active = 0;
949 erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
950 if (update(tpd))
951 leader_update(tpd);
952 }
953
954 #ifdef DEBUG
955 {
956 erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
957 n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
958 ASSERT(tpd->active <= n && n <= intrnl->managed.no);
959 }
960 #endif
961
962 }
963
964 static ERTS_INLINE void
unmanaged_continue(ErtsThrPrgrDelayHandle handle)965 unmanaged_continue(ErtsThrPrgrDelayHandle handle)
966 {
967 int umrefc_ix = (int) handle;
968 erts_aint_t refc;
969
970 ASSERT(umrefc_ix == 0 || umrefc_ix == 1);
971 refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc);
972 ASSERT(refc >= 0);
973 if (refc == 0) {
974 erts_aint_t lflgs;
975 ERTS_THR_READ_MEMORY_BARRIER;
976 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
977 if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
978 | ERTS_THR_PRGR_LFLG_WAITING_UM
979 | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
980 == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM)
981 && got_sched_wakeups()) {
982 /* Others waiting for us... */
983 wakeup_managed(1);
984 }
985 }
986 }
987
988 void
erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)989 erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)
990 {
991 #ifdef ERTS_ENABLE_LOCK_CHECK
992 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
993 ERTS_LC_ASSERT(tpd && tpd->is_delaying);
994 tpd->is_delaying--;
995 ASSERT(tpd->is_delaying >= 0);
996 if (!tpd->is_delaying)
997 return_tmp_thr_prgr_data(tpd);
998 #endif
999 ASSERT(!erts_thr_progress_is_managed_thread());
1000
1001 unmanaged_continue(handle);
1002 }
1003
1004 ErtsThrPrgrDelayHandle
erts_thr_progress_unmanaged_delay__(void)1005 erts_thr_progress_unmanaged_delay__(void)
1006 {
1007 int umrefc_ix;
1008 ASSERT(!erts_thr_progress_is_managed_thread());
1009 umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1010 while (1) {
1011 int tmp_ix;
1012 erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc);
1013 tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1014 if (tmp_ix == umrefc_ix)
1015 break;
1016 unmanaged_continue(umrefc_ix);
1017 umrefc_ix = tmp_ix;
1018 }
1019 #ifdef ERTS_ENABLE_LOCK_CHECK
1020 {
1021 ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL);
1022 tpd->is_delaying++;
1023 }
1024 #endif
1025 return (ErtsThrPrgrDelayHandle) umrefc_ix;
1026 }
1027
1028 static ERTS_INLINE int
has_reached_wakeup(ErtsThrPrgrVal wakeup)1029 has_reached_wakeup(ErtsThrPrgrVal wakeup)
1030 {
1031 /*
1032 * Exactly the same as erts_thr_progress_has_reached(), but
1033 * also verify valid wakeup requests in debug mode.
1034 */
1035 ErtsThrPrgrVal current;
1036
1037 current = read_acqb(&erts_thr_prgr__.current);
1038
1039 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1040 {
1041 ErtsThrPrgrVal limit;
1042 /*
1043 * erts_thr_progress_later() returns values which are
1044 * equal to 'current + 2', or 'current + 3'. That is, users
1045 * should never get a hold of values larger than that.
1046 *
1047 * That is, valid values are values less than 'current + 4'.
1048 *
1049 * Values larger than this won't work with the wakeup
1050 * algorithm.
1051 */
1052
1053 limit = current + 4;
1054 if (limit == ERTS_THR_PRGR_VAL_WAITING)
1055 limit = 0;
1056 else if (limit < current) /* Wrapped */
1057 limit += 1;
1058
1059 if (!erts_thr_progress_has_passed__(limit, wakeup))
1060 erts_exit(ERTS_ABORT_EXIT,
1061 "Invalid wakeup request value found:"
1062 " current=%b64u, wakeup=%b64u, limit=%b64u",
1063 current, wakeup, limit);
1064 }
1065 #endif
1066
1067 if (current == wakeup)
1068 return 1;
1069 return erts_thr_progress_has_passed__(current, wakeup);
1070 }
1071
1072 static void
request_wakeup_managed(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1073 request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1074 {
1075 ErtsThrPrgrManagedWakeupData *mwd;
1076 int ix, wix;
1077
1078 /*
1079 * Only managed threads that aren't in waiting state
1080 * and aren't deep sleepers are allowed to call this
1081 * function.
1082 */
1083
1084 ASSERT(tpd->is_managed);
1085 ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING);
1086 ASSERT(!tpd->is_deep_sleeper);
1087
1088 if (has_reached_wakeup(value)) {
1089 wakeup_managed(tpd->id);
1090 return;
1091 }
1092
1093 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1094 if (tpd->wakeup_request[wix] == value)
1095 return; /* Already got a request registered */
1096
1097 ASSERT(erts_thr_progress_has_passed__(value,
1098 tpd->wakeup_request[wix]));
1099
1100
1101 if (tpd->confirmed == value) {
1102 /*
1103 * We have already confirmed this value. We need to request
1104 * wakeup for a value later than our latest confirmed value in
1105 * order to prevent progress from reaching the requested value
1106 * while we are writing the request.
1107 *
1108 * It is ok to move the wakeup request forward since the only
1109 * guarantee we make (and can make) is that the thread will be
1110 * woken some time *after* the requested value has been reached.
1111 */
1112 value++;
1113 if (value == ERTS_THR_PRGR_VAL_WAITING)
1114 value = 0;
1115
1116 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1117 if (tpd->wakeup_request[wix] == value)
1118 return; /* Already got a request registered */
1119
1120 ASSERT(erts_thr_progress_has_passed__(value,
1121 tpd->wakeup_request[wix]));
1122 }
1123
1124 tpd->wakeup_request[wix] = value;
1125
1126 mwd = intrnl->managed.data[wix];
1127
1128 ix = erts_atomic32_inc_read_nob(&mwd->len) - 1;
1129 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1130 if (ix >= intrnl->managed.no)
1131 erts_exit(ERTS_ABORT_EXIT, "Internal error: Too many wakeup requests\n");
1132 #endif
1133 mwd->id[ix] = tpd->id;
1134
1135 ASSERT(!erts_thr_progress_has_reached(value));
1136
1137 /*
1138 * This thread is guarranteed to issue a full memory barrier:
1139 * - after the request has been written, but
1140 * - before the global thread progress reach the (possibly
1141 * increased) requested wakeup value.
1142 */
1143 }
1144
1145 static void
request_wakeup_unmanaged(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1146 request_wakeup_unmanaged(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1147 {
1148 int wix, ix, id, bit;
1149 ErtsThrPrgrUnmanagedWakeupData *umwd;
1150
1151 ASSERT(!tpd->is_managed);
1152
1153 /*
1154 * Thread progress *can* reach and pass our requested value while
1155 * we are writing the request.
1156 */
1157
1158 if (has_reached_wakeup(value)) {
1159 wakeup_unmanaged(tpd->id);
1160 return;
1161 }
1162
1163 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1164
1165 if (tpd->wakeup_request[wix] == value)
1166 return; /* Already got a request registered */
1167
1168 ASSERT(erts_thr_progress_has_passed__(value,
1169 tpd->wakeup_request[wix]));
1170
1171 umwd = intrnl->unmanaged.data[wix];
1172
1173 id = tpd->id;
1174
1175 bit = id & ERTS_THR_PRGR_BM_MASK;
1176 ix = id >> ERTS_THR_PRGR_BM_SHIFT;
1177 ASSERT(0 <= ix && ix < umwd->low_sz);
1178 erts_atomic32_read_bor_nob(&umwd->low[ix], 1 << bit);
1179
1180 bit = ix & ERTS_THR_PRGR_BM_MASK;
1181 ix >>= ERTS_THR_PRGR_BM_SHIFT;
1182 ASSERT(0 <= ix && ix < umwd->high_sz);
1183 erts_atomic32_read_bor_nob(&umwd->high[ix], 1 << bit);
1184
1185 erts_atomic32_inc_mb(&umwd->len);
1186
1187 if (erts_thr_progress_has_reached(value))
1188 wakeup_unmanaged(tpd->id);
1189 else
1190 tpd->wakeup_request[wix] = value;
1191 }
1192
1193 void
erts_thr_progress_wakeup(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1194 erts_thr_progress_wakeup(ErtsThrPrgrData *tpd,
1195 ErtsThrPrgrVal value)
1196 {
1197
1198 ASSERT(!tpd->is_temporary);
1199 if (tpd->is_managed)
1200 request_wakeup_managed(tpd, value);
1201 else
1202 request_wakeup_unmanaged(tpd, value);
1203 }
1204
1205 static void
wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData * umwd)1206 wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData *umwd)
1207 {
1208 int hix;
1209 for (hix = 0; hix < umwd->high_sz; hix++) {
1210 erts_aint32_t hmask = erts_atomic32_read_nob(&umwd->high[hix]);
1211 if (hmask) {
1212 int hbase = hix << ERTS_THR_PRGR_BM_SHIFT;
1213 int hbit;
1214 for (hbit = 0; hbit < ERTS_THR_PRGR_BM_BITS; hbit++) {
1215 if (hmask & (1U << hbit)) {
1216 erts_aint_t lmask;
1217 int lix = hbase + hbit;
1218 ASSERT(0 <= lix && lix < umwd->low_sz);
1219 lmask = erts_atomic32_read_nob(&umwd->low[lix]);
1220 if (lmask) {
1221 int lbase = lix << ERTS_THR_PRGR_BM_SHIFT;
1222 int lbit;
1223 for (lbit = 0; lbit < ERTS_THR_PRGR_BM_BITS; lbit++) {
1224 if (lmask & (1U << lbit)) {
1225 int id = lbase + lbit;
1226 wakeup_unmanaged(id);
1227 }
1228 }
1229 erts_atomic32_set_nob(&umwd->low[lix], 0);
1230 }
1231 }
1232 }
1233 erts_atomic32_set_nob(&umwd->high[hix], 0);
1234 }
1235 }
1236 }
1237
1238
1239 static void
handle_wakeup_requests(ErtsThrPrgrVal current)1240 handle_wakeup_requests(ErtsThrPrgrVal current)
1241 {
1242 ErtsThrPrgrManagedWakeupData *mwd;
1243 ErtsThrPrgrUnmanagedWakeupData *umwd;
1244 int wix, len, i;
1245
1246 wix = ERTS_THR_PRGR_WAKEUP_IX(current);
1247
1248 mwd = intrnl->managed.data[wix];
1249 len = erts_atomic32_read_nob(&mwd->len);
1250 ASSERT(len >= 0);
1251 if (len) {
1252 for (i = 0; i < len; i++)
1253 wakeup_managed(mwd->id[i]);
1254 erts_atomic32_set_nob(&mwd->len, 0);
1255 }
1256
1257 umwd = intrnl->unmanaged.data[wix];
1258 len = erts_atomic32_read_nob(&umwd->len);
1259 ASSERT(len >= 0);
1260 if (len) {
1261 wakeup_unmanaged_threads(umwd);
1262 erts_atomic32_set_nob(&umwd->len, 0);
1263 }
1264
1265 }
1266
1267 static int
got_sched_wakeups(void)1268 got_sched_wakeups(void)
1269 {
1270 int wix;
1271
1272 ERTS_THR_MEMORY_BARRIER;
1273
1274 for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1275 ErtsThrPrgrManagedWakeupData **mwd = intrnl->managed.data;
1276 if (erts_atomic32_read_nob(&mwd[wix]->len))
1277 return 1;
1278 }
1279 for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1280 ErtsThrPrgrUnmanagedWakeupData **umwd = intrnl->unmanaged.data;
1281 if (erts_atomic32_read_nob(&umwd[wix]->len))
1282 return 1;
1283 }
1284 return 0;
1285 }
1286
1287 static erts_aint32_t
block_thread(ErtsThrPrgrData * tpd)1288 block_thread(ErtsThrPrgrData *tpd)
1289 {
1290 erts_aint32_t lflgs;
1291 ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
1292
1293 do {
1294 block_count_dec();
1295
1296 while (1) {
1297 cbp->prepare_wait(cbp->arg);
1298 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
1299 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1300 cbp->wait(cbp->arg);
1301 else
1302 break;
1303 }
1304
1305 } while (block_count_inc());
1306
1307 cbp->finalize_wait(cbp->arg);
1308
1309 return lflgs;
1310 }
1311
1312 static erts_aint32_t
thr_progress_block(ErtsThrPrgrData * tpd,int wait)1313 thr_progress_block(ErtsThrPrgrData *tpd, int wait)
1314 {
1315 erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
1316 erts_aint32_t lflgs, bc;
1317
1318 if (tpd->is_blocking++)
1319 return (erts_aint32_t) 0;
1320
1321 while (1) {
1322 lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
1323 ERTS_THR_PRGR_LFLG_BLOCK);
1324 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1325 block_thread(tpd);
1326 else
1327 break;
1328 }
1329
1330 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1331 erts_fprintf(stderr, "block(%d)\n", tpd->id);
1332 #endif
1333
1334 ASSERT(ERTS_AINT_NULL
1335 == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1336
1337 if (wait) {
1338 event = erts_tse_fetch();
1339 erts_tse_reset(event);
1340 erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
1341 (erts_aint_t) event);
1342 }
1343 if (tpd->is_managed)
1344 erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
1345 bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
1346 ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1347 bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
1348 if (wait) {
1349 while (bc != 0) {
1350 erts_tse_wait(event);
1351 erts_tse_reset(event);
1352 bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1353 }
1354 }
1355
1356 /* tse event returned in erts_thr_progress_unblock() */
1357 return bc;
1358
1359 }
1360
1361 void
erts_thr_progress_block(void)1362 erts_thr_progress_block(void)
1363 {
1364 thr_progress_block(tmp_thr_prgr_data(NULL), 1);
1365 }
1366
1367 int
erts_thr_progress_fatal_error_block(ErtsThrPrgrData * tmp_tpd_bufp)1368 erts_thr_progress_fatal_error_block(ErtsThrPrgrData *tmp_tpd_bufp)
1369 {
1370 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1371
1372 if (!tpd) {
1373 /*
1374 * We stack allocate since failure to allocate memory may
1375 * have caused the problem in the first place. This is ok
1376 * since we never complete an unblock after a fatal error
1377 * block.
1378 */
1379 tpd = tmp_tpd_bufp;
1380 init_tmp_thr_prgr_data(tpd);
1381 }
1382
1383 /* Returns number of threads that have not yes been blocked */
1384 return thr_progress_block(tpd, 0);
1385 }
1386
1387 void
erts_thr_progress_fatal_error_wait(SWord timeout)1388 erts_thr_progress_fatal_error_wait(SWord timeout) {
1389 erts_aint32_t bc;
1390 SWord time_left = timeout;
1391 ErtsMonotonicTime timeout_time;
1392 ErtsSchedulerData *esdp = erts_get_scheduler_data();
1393
1394 /*
1395 * Counting poll intervals may give us a too long timeout
1396 * if cpu is busy. We use timeout time to try to prevent
1397 * this. In case we havn't got time correction this may
1398 * however fail too...
1399 */
1400 timeout_time = erts_get_monotonic_time(esdp);
1401 timeout_time += ERTS_MSEC_TO_MONOTONIC((ErtsMonotonicTime) timeout);
1402
1403 while (1) {
1404 if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
1405 time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
1406 bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1407 if (bc == 0)
1408 break; /* Succefully blocked all managed threads */
1409 if (time_left <= 0)
1410 break; /* Timeout */
1411 if (timeout_time <= erts_get_monotonic_time(esdp))
1412 break; /* Timeout */
1413 }
1414 }
1415
1416 void
erts_thr_progress_unblock(void)1417 erts_thr_progress_unblock(void)
1418 {
1419 erts_tse_t *event;
1420 int id, break_id, sz, wakeup;
1421 ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
1422
1423 ASSERT(tpd->is_blocking);
1424 if (--tpd->is_blocking)
1425 return;
1426
1427 sz = intrnl->managed.no;
1428
1429 wakeup = 1;
1430 if (!tpd->is_managed)
1431 id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
1432 else {
1433 break_id = tpd->id;
1434 id = break_id + 1;
1435 if (id >= sz)
1436 id = 0;
1437 if (id == break_id)
1438 wakeup = 0;
1439 erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
1440 }
1441
1442 event = ((erts_tse_t *)
1443 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1444 ASSERT(event);
1445 erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
1446
1447 erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
1448 ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1449 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1450 erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
1451 #endif
1452 erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
1453 ~ERTS_THR_PRGR_LFLG_BLOCK);
1454
1455 if (wakeup) {
1456 do {
1457 ErtsThrPrgrVal tmp;
1458 tmp = read_nob(&intrnl->thr[id].data.current);
1459 if (tmp != ERTS_THR_PRGR_VAL_WAITING)
1460 wakeup_managed(id);
1461 if (++id >= sz)
1462 id = 0;
1463 } while (id != break_id);
1464 }
1465
1466 return_tmp_thr_prgr_data(tpd);
1467 erts_tse_return(event);
1468 }
1469
1470 int
erts_thr_progress_is_blocking(void)1471 erts_thr_progress_is_blocking(void)
1472 {
1473 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1474 return tpd && tpd->is_blocking;
1475 }
1476
erts_thr_progress_dbg_print_state(void)1477 void erts_thr_progress_dbg_print_state(void)
1478 {
1479 int id;
1480 int sz = intrnl->managed.no;
1481
1482 erts_fprintf(stderr, "--- thread progress ---\n");
1483 erts_fprintf(stderr,"current=%b64u\n", erts_thr_progress_current());
1484 for (id = 0; id < sz; id++) {
1485 ErtsThrPrgrVal current = read_nob(&intrnl->thr[id].data.current);
1486 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1487 erts_aint32_t state_debug;
1488 char *active, *leader;
1489
1490 state_debug = erts_atomic32_read_nob(&intrnl->thr[id].data.state_debug);
1491 active = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE
1492 ? "true"
1493 : "false");
1494 leader = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_LEADER
1495 ? "true"
1496 : "false");
1497 #endif
1498 if (current == ERTS_THR_PRGR_VAL_WAITING)
1499 erts_fprintf(stderr,
1500 " id=%d, current=WAITING"
1501 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1502 ", active=%s, leader=%s"
1503 #endif
1504 "\n", id
1505 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1506 , active, leader
1507 #endif
1508 );
1509 else
1510 erts_fprintf(stderr,
1511 " id=%d, current=%b64u"
1512 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1513 ", active=%s, leader=%s"
1514 #endif
1515 "\n", id, current
1516 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1517 , active, leader
1518 #endif
1519 );
1520 }
1521 erts_fprintf(stderr, "-----------------------\n");
1522
1523
1524 }
1525
1526