1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2011-2018. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Thread progress information. Used by lock free algorithms
23 * to determine when all involved threads are guaranteed to
24 * have passed a specific point of execution.
25 *
26 * Usage instructions below.
27 *
28 * Author: Rickard Green
29 */
30
31 /*
32 * ------ Usage instructions -----------------------------------------------
33 *
34 * This module keeps track of the progress of a set of managed threads. Only
35 * threads that behave well can be allowed to be managed. A managed thread
36 * should update its thread progress frequently. Currently only scheduler
37 * threads, the system-message-dispatcher threads, and the aux-thread are
38 * managed threads. We typically do not want any async threads as managed
39 * threads since they cannot guarantee a frequent update of thread progress,
40 * since they execute user implemented driver code that is assumed to be
41 * time consuming.
42 *
43 * erts_thr_progress_current() returns the global current thread progress
44 * value of managed threads. I.e., the latest progress value that all
45 * managed threads have reached. Thread progress values are opaque.
46 *
47 * erts_thr_progress_has_reached(VAL) returns a value != 0 if current
48 * global thread progress has reached or passed VAL.
49 *
50 * erts_thr_progress_later() returns a thread progress value in the future
51 * which no managed thread have yet reached.
52 *
53 * All threads issue a full memory barrier when reaching a new thread
54 * progress value. They only reach new thread progress values in specific
55 * controlled states when calling erts_thr_progress_update(). Schedulers
56 * call erts_thr_progress_update() in between execution of processes,
57 * when going to sleep and when waking up.
58 *
59 * Sleeping managed threads are considered to have reached next thread
60 * progress value immediately. They are not woken and do therefore not
61 * issue any memory barriers when reaching a new thread progress value.
62 * A sleeping thread do however immediately issue a memory barrier upon
63 * wakeup.
64 *
65 * Both managed and registered unmanaged threads may request wakeup when
66 * the global thread progress reach a certain value using
67 * erts_thr_progress_wakeup().
68 *
69 * Note that thread progress values are opaque, and that you are only
70 * allowed to use thread progress values retrieved from this API!
71 *
72 * -------------------------------------------------------------------------
73 */
74
75 #ifdef HAVE_CONFIG_H
76 # include "config.h"
77 #endif
78
79 #include <stddef.h> /* offsetof() */
80 #include "erl_thr_progress.h"
81 #include "global.h"
82
83
84 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 0
85
86 #ifdef DEBUG
87 #undef ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
88 #define ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE 1
89 #endif
90
91 #define ERTS_THR_PRGR_PRINT_LEADER 0
92 #define ERTS_THR_PRGR_PRINT_VAL 0
93 #define ERTS_THR_PRGR_PRINT_BLOCKERS 0
94
95 #define ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL 100
96
97 #define ERTS_THR_PRGR_LFLG_BLOCK ((erts_aint32_t) (1U << 31))
98 #define ERTS_THR_PRGR_LFLG_NO_LEADER ((erts_aint32_t) (1U << 30))
99 #define ERTS_THR_PRGR_LFLG_WAITING_UM ((erts_aint32_t) (1U << 29))
100 #define ERTS_THR_PRGR_LFLG_ACTIVE_MASK (~(ERTS_THR_PRGR_LFLG_NO_LEADER \
101 | ERTS_THR_PRGR_LFLG_BLOCK \
102 | ERTS_THR_PRGR_LFLG_WAITING_UM))
103
104 #define ERTS_THR_PRGR_LFLGS_ACTIVE(LFLGS) \
105 ((LFLGS) & ERTS_THR_PRGR_LFLG_ACTIVE_MASK)
106
107 /*
108 * We use a 64-bit value for thread progress. By this wrapping of
109 * the thread progress will more or less never occur.
110 *
111 * On 32-bit systems we therefore need a double word atomic.
112 */
113 #undef read_acqb
114 #define read_acqb erts_thr_prgr_read_acqb__
115 #undef read_nob
116 #define read_nob erts_thr_prgr_read_nob__
117
118 static ERTS_INLINE void
set_mb(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)119 set_mb(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
120 {
121 erts_atomic64_set_mb(atmc, (erts_aint64_t) val);
122 }
123
124 static ERTS_INLINE void
set_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)125 set_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
126 {
127 erts_atomic64_set_nob(atmc, (erts_aint64_t) val);
128 }
129
130 static ERTS_INLINE void
init_nob(ERTS_THR_PRGR_ATOMIC * atmc,ErtsThrPrgrVal val)131 init_nob(ERTS_THR_PRGR_ATOMIC *atmc, ErtsThrPrgrVal val)
132 {
133 erts_atomic64_init_nob(atmc, (erts_aint64_t) val);
134 }
135
136 /* #define ERTS_THR_PROGRESS_STATE_DEBUG */
137
138 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
139
140 #ifdef __GNUC__
141 #warning "Thread progress state debug is on"
142 #endif
143
144 #define ERTS_THR_PROGRESS_STATE_DEBUG_LEADER ((erts_aint32_t) (1U << 0))
145 #define ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE ((erts_aint32_t) (1U << 1))
146
147 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID) \
148 erts_atomic32_init_nob(&intrnl->thr[(ID)].data.state_debug, \
149 ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE)
150
151 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON) \
152 do { \
153 erts_aint32_t state_debug__; \
154 state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \
155 if ((ON)) \
156 state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \
157 else \
158 state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE; \
159 erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \
160 } while (0)
161
162 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON) \
163 do { \
164 erts_aint32_t state_debug__; \
165 state_debug__ = erts_atomic32_read_nob(&intrnl->thr[(ID)].data.state_debug); \
166 if ((ON)) \
167 state_debug__ |= ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \
168 else \
169 state_debug__ &= ~ERTS_THR_PROGRESS_STATE_DEBUG_LEADER; \
170 erts_atomic32_set_nob(&intrnl->thr[(ID)].data.state_debug, state_debug__); \
171 } while (0)
172
173 #else
174
175 #define ERTS_THR_PROGRESS_STATE_DEBUG_INIT(ID)
176 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(ID, ON)
177 #define ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(ID, ON)
178
179 #endif /* ERTS_THR_PROGRESS_STATE_DEBUG */
180
181 #define ERTS_THR_PRGR_BLCKR_INVALID ((erts_aint32_t) (~0U))
182 #define ERTS_THR_PRGR_BLCKR_UNMANAGED ((erts_aint32_t) (1U << 31))
183
184 #define ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING ((erts_aint32_t) (1U << 31))
185
186 #define ERTS_THR_PRGR_BM_BITS 32
187 #define ERTS_THR_PRGR_BM_SHIFT 5
188 #define ERTS_THR_PRGR_BM_MASK 0x1f
189
190 #define ERTS_THR_PRGR_WAKEUP_DATA_MASK (ERTS_THR_PRGR_WAKEUP_DATA_SIZE - 1)
191
192 #define ERTS_THR_PRGR_WAKEUP_IX(V) \
193 ((int) ((V) & ERTS_THR_PRGR_WAKEUP_DATA_MASK))
194
195 typedef struct {
196 erts_atomic32_t len;
197 int id[1];
198 } ErtsThrPrgrManagedWakeupData;
199
200 typedef struct {
201 erts_atomic32_t len;
202 int high_sz;
203 int low_sz;
204 erts_atomic32_t *high;
205 erts_atomic32_t *low;
206 } ErtsThrPrgrUnmanagedWakeupData;
207
208 typedef struct {
209 erts_atomic32_t lflgs;
210 erts_atomic32_t block_count;
211 erts_atomic_t blocker_event;
212 erts_atomic32_t pref_wakeup_used;
213 erts_atomic32_t managed_count;
214 erts_atomic32_t managed_id;
215 erts_atomic32_t unmanaged_id;
216 int chk_next_ix;
217 struct {
218 int waiting;
219 erts_atomic32_t current;
220 } umrefc_ix;
221 } ErtsThrPrgrMiscData;
222
223 typedef struct {
224 ERTS_THR_PRGR_ATOMIC current;
225 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
226 erts_atomic32_t state_debug;
227 #endif
228 } ErtsThrPrgrElement;
229
230 typedef union {
231 ErtsThrPrgrElement data;
232 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsThrPrgrElement))];
233 } ErtsThrPrgrArray;
234
235 typedef union {
236 erts_atomic_t refc;
237 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(erts_atomic_t))];
238 } ErtsThrPrgrUnmanagedRefc;
239
240 typedef struct {
241 union {
242 ErtsThrPrgrMiscData data;
243 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
244 sizeof(ErtsThrPrgrMiscData))];
245 } misc;
246 ErtsThrPrgrUnmanagedRefc umrefc[2];
247 ErtsThrPrgrArray *thr;
248 struct {
249 int no;
250 ErtsThrPrgrCallbacks *callbacks;
251 ErtsThrPrgrManagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
252 } managed;
253 struct {
254 int no;
255 ErtsThrPrgrCallbacks *callbacks;
256 ErtsThrPrgrUnmanagedWakeupData *data[ERTS_THR_PRGR_WAKEUP_DATA_SIZE];
257 } unmanaged;
258 } ErtsThrPrgrInternalData;
259
260 static ErtsThrPrgrInternalData *intrnl;
261
262 ErtsThrPrgr erts_thr_prgr__;
263
264 erts_tsd_key_t erts_thr_prgr_data_key__;
265
266 static void handle_wakeup_requests(ErtsThrPrgrVal current);
267 static int got_sched_wakeups(void);
268 static erts_aint32_t block_thread(ErtsThrPrgrData *tpd);
269
270 static ERTS_INLINE void
wakeup_managed(int id)271 wakeup_managed(int id)
272 {
273 ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[id];
274 ASSERT(0 <= id && id < intrnl->managed.no);
275 cbp->wakeup(cbp->arg);
276 }
277
278
279 static ERTS_INLINE void
wakeup_unmanaged(int id)280 wakeup_unmanaged(int id)
281 {
282 ErtsThrPrgrCallbacks *cbp = &intrnl->unmanaged.callbacks[id];
283 ASSERT(0 <= id && id < intrnl->unmanaged.no);
284 cbp->wakeup(cbp->arg);
285 }
286
287 static ERTS_INLINE ErtsThrPrgrData *
perhaps_thr_prgr_data(ErtsSchedulerData * esdp)288 perhaps_thr_prgr_data(ErtsSchedulerData *esdp)
289 {
290 if (esdp)
291 return &esdp->thr_progress_data;
292 else
293 return erts_tsd_get(erts_thr_prgr_data_key__);
294 }
295
296 static ERTS_INLINE ErtsThrPrgrData *
thr_prgr_data(ErtsSchedulerData * esdp)297 thr_prgr_data(ErtsSchedulerData *esdp)
298 {
299 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
300 ASSERT(tpd);
301 return tpd;
302 }
303
304 static void
init_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)305 init_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
306 {
307 tpd->id = -1;
308 tpd->is_managed = 0;
309 tpd->is_blocking = 0;
310 tpd->is_temporary = 1;
311 #ifdef ERTS_ENABLE_LOCK_CHECK
312 tpd->is_delaying = 0;
313 #endif
314 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
315 }
316
317 static ERTS_INLINE ErtsThrPrgrData *
tmp_thr_prgr_data(ErtsSchedulerData * esdp)318 tmp_thr_prgr_data(ErtsSchedulerData *esdp)
319 {
320 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(esdp);
321
322 if (!tpd) {
323 /*
324 * We only allocate the part up to the wakeup_request field which is
325 * the first field only used by registered threads
326 */
327 size_t alloc_size = offsetof(ErtsThrPrgrData, wakeup_request);
328
329 /* We may land here as a result of unmanaged_delay being called from
330 * the lock counting module, which in turn might be called from within
331 * the allocator, so we use plain malloc to avoid deadlocks. */
332 tpd =
333 #ifdef ERTS_ENABLE_LOCK_COUNT
334 malloc(alloc_size);
335 #else
336 erts_alloc(ERTS_ALC_T_T_THR_PRGR_DATA, alloc_size);
337 #endif
338
339 init_tmp_thr_prgr_data(tpd);
340 }
341
342 return tpd;
343 }
344
345 static ERTS_INLINE void
return_tmp_thr_prgr_data(ErtsThrPrgrData * tpd)346 return_tmp_thr_prgr_data(ErtsThrPrgrData *tpd)
347 {
348 if (tpd->is_temporary) {
349 erts_tsd_set(erts_thr_prgr_data_key__, NULL);
350
351 #ifdef ERTS_ENABLE_LOCK_COUNT
352 free(tpd);
353 #else
354 erts_free(ERTS_ALC_T_T_THR_PRGR_DATA, tpd);
355 #endif
356 }
357 }
358
359 static ERTS_INLINE int
block_count_dec(void)360 block_count_dec(void)
361 {
362 erts_aint32_t block_count;
363 block_count = erts_atomic32_dec_read_mb(&intrnl->misc.data.block_count);
364 if (block_count == 0) {
365 erts_tse_t *event;
366 event = ((erts_tse_t*)
367 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
368 if (event)
369 erts_tse_set(event);
370 return 1;
371 }
372
373 return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
374 }
375
376 static ERTS_INLINE int
block_count_inc(void)377 block_count_inc(void)
378 {
379 erts_aint32_t block_count;
380 block_count = erts_atomic32_inc_read_mb(&intrnl->misc.data.block_count);
381 return (block_count & ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING) == 0;
382 }
383
384
385 void
erts_thr_progress_pre_init(void)386 erts_thr_progress_pre_init(void)
387 {
388 intrnl = NULL;
389 erts_tsd_key_create(&erts_thr_prgr_data_key__,
390 "erts_thr_prgr_data_key");
391 init_nob(&erts_thr_prgr__.current, ERTS_THR_PRGR_VAL_FIRST);
392 }
393
394 void
erts_thr_progress_init(int no_schedulers,int managed,int unmanaged)395 erts_thr_progress_init(int no_schedulers, int managed, int unmanaged)
396 {
397 int i, j, um_low, um_high;
398 char *ptr;
399 size_t cb_sz, intrnl_sz, thr_arr_sz, m_wakeup_size, um_wakeup_size,
400 tot_size;
401
402 intrnl_sz = sizeof(ErtsThrPrgrInternalData);
403 intrnl_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(intrnl_sz);
404
405 cb_sz = sizeof(ErtsThrPrgrCallbacks)*(managed+unmanaged);
406 cb_sz = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(cb_sz);
407
408 thr_arr_sz = sizeof(ErtsThrPrgrArray)*managed;
409 ASSERT(thr_arr_sz == ERTS_ALC_CACHE_LINE_ALIGN_SIZE(thr_arr_sz));
410
411 m_wakeup_size = sizeof(ErtsThrPrgrManagedWakeupData);
412 m_wakeup_size += (managed - 1)*sizeof(int);
413 m_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(m_wakeup_size);
414
415 um_low = (unmanaged - 1)/ERTS_THR_PRGR_BM_BITS + 1;
416 um_high = (um_low - 1)/ERTS_THR_PRGR_BM_BITS + 1;
417
418 um_wakeup_size = sizeof(ErtsThrPrgrUnmanagedWakeupData);
419 um_wakeup_size += (um_high + um_low)*sizeof(erts_atomic32_t);
420 um_wakeup_size = ERTS_ALC_CACHE_LINE_ALIGN_SIZE(um_wakeup_size);
421
422 tot_size = intrnl_sz;
423 tot_size += cb_sz;
424 tot_size += thr_arr_sz;
425 tot_size += m_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
426 tot_size += um_wakeup_size*ERTS_THR_PRGR_WAKEUP_DATA_SIZE;
427
428 ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_THR_PRGR_IDATA,
429 tot_size);
430
431 intrnl = (ErtsThrPrgrInternalData *) ptr;
432 ptr += intrnl_sz;
433
434 erts_atomic32_init_nob(&intrnl->misc.data.lflgs,
435 ERTS_THR_PRGR_LFLG_NO_LEADER);
436 erts_atomic32_init_nob(&intrnl->misc.data.block_count,
437 (ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING
438 | (erts_aint32_t) managed));
439 erts_atomic_init_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
440 erts_atomic32_init_nob(&intrnl->misc.data.pref_wakeup_used, 0);
441 erts_atomic32_init_nob(&intrnl->misc.data.managed_count, 0);
442 erts_atomic32_init_nob(&intrnl->misc.data.managed_id, no_schedulers);
443 erts_atomic32_init_nob(&intrnl->misc.data.unmanaged_id, -1);
444 intrnl->misc.data.chk_next_ix = 0;
445 intrnl->misc.data.umrefc_ix.waiting = -1;
446 erts_atomic32_init_nob(&intrnl->misc.data.umrefc_ix.current, 0);
447
448 erts_atomic_init_nob(&intrnl->umrefc[0].refc, (erts_aint_t) 0);
449 erts_atomic_init_nob(&intrnl->umrefc[1].refc, (erts_aint_t) 0);
450
451 intrnl->thr = (ErtsThrPrgrArray *) ptr;
452 ptr += thr_arr_sz;
453 for (i = 0; i < managed; i++)
454 init_nob(&intrnl->thr[i].data.current, 0);
455
456 intrnl->managed.callbacks = (ErtsThrPrgrCallbacks *) ptr;
457 intrnl->unmanaged.callbacks = &intrnl->managed.callbacks[managed];
458 ptr += cb_sz;
459
460 intrnl->managed.no = managed;
461 for (i = 0; i < managed; i++) {
462 intrnl->managed.callbacks[i].arg = NULL;
463 intrnl->managed.callbacks[i].wakeup = NULL;
464 }
465
466 intrnl->unmanaged.no = unmanaged;
467 for (i = 0; i < unmanaged; i++) {
468 intrnl->unmanaged.callbacks[i].arg = NULL;
469 intrnl->unmanaged.callbacks[i].wakeup = NULL;
470 }
471
472 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
473 intrnl->managed.data[i] = (ErtsThrPrgrManagedWakeupData *) ptr;
474 erts_atomic32_init_nob(&intrnl->managed.data[i]->len, 0);
475 ptr += m_wakeup_size;
476 }
477
478 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
479 erts_atomic32_t *bm;
480 intrnl->unmanaged.data[i] = (ErtsThrPrgrUnmanagedWakeupData *) ptr;
481 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->len, 0);
482 bm = (erts_atomic32_t *) (ptr + sizeof(ErtsThrPrgrUnmanagedWakeupData));
483 intrnl->unmanaged.data[i]->high = bm;
484 intrnl->unmanaged.data[i]->high_sz = um_high;
485 for (j = 0; j < um_high; j++)
486 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->high[j], 0);
487 intrnl->unmanaged.data[i]->low
488 = &intrnl->unmanaged.data[i]->high[um_high];
489 intrnl->unmanaged.data[i]->low_sz = um_low;
490 for (j = 0; j < um_low; j++)
491 erts_atomic32_init_nob(&intrnl->unmanaged.data[i]->low[j], 0);
492 ptr += um_wakeup_size;
493 }
494 ERTS_THR_MEMORY_BARRIER;
495 }
496
497 static void
init_wakeup_request_array(ErtsThrPrgrVal * w)498 init_wakeup_request_array(ErtsThrPrgrVal *w)
499 {
500 int i;
501 ErtsThrPrgrVal current;
502
503 current = read_acqb(&erts_thr_prgr__.current);
504 for (i = 0; i < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; i++) {
505 w[i] = current - ((ErtsThrPrgrVal) (ERTS_THR_PRGR_WAKEUP_DATA_SIZE + i));
506 if (w[i] > current)
507 w[i]--;
508 }
509 }
510
erts_thr_progress_data(void)511 ErtsThrPrgrData *erts_thr_progress_data(void) {
512 return erts_tsd_get(erts_thr_prgr_data_key__);
513 }
514
515 void
erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks * callbacks)516 erts_thr_progress_register_unmanaged_thread(ErtsThrPrgrCallbacks *callbacks)
517 {
518 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
519 int is_blocking = 0;
520
521 if (tpd) {
522 if (!tpd->is_temporary)
523 erts_exit(ERTS_ABORT_EXIT,
524 "%s:%d:%s(): Double register of thread\n",
525 __FILE__, __LINE__, __func__);
526 is_blocking = tpd->is_blocking;
527 return_tmp_thr_prgr_data(tpd);
528 }
529
530 /*
531 * We only allocate the part up to the leader field
532 * which is the first field only used by managed threads
533 */
534 tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA,
535 offsetof(ErtsThrPrgrData, leader));
536 tpd->id = (int) erts_atomic32_inc_read_nob(&intrnl->misc.data.unmanaged_id);
537 tpd->is_managed = 0;
538 tpd->is_blocking = is_blocking;
539 tpd->is_temporary = 0;
540 #ifdef ERTS_ENABLE_LOCK_CHECK
541 tpd->is_delaying = 0;
542 #endif
543 ASSERT(tpd->id >= 0);
544 if (tpd->id >= intrnl->unmanaged.no)
545 erts_exit(ERTS_ABORT_EXIT,
546 "%s:%d:%s(): Too many unmanaged registered threads\n",
547 __FILE__, __LINE__, __func__);
548
549 init_wakeup_request_array(&tpd->wakeup_request[0]);
550 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
551
552 ASSERT(callbacks->wakeup);
553
554 intrnl->unmanaged.callbacks[tpd->id] = *callbacks;
555 }
556
557
558 ErtsThrPrgrData *
erts_thr_progress_register_managed_thread(ErtsSchedulerData * esdp,ErtsThrPrgrCallbacks * callbacks,int pref_wakeup,int deep_sleeper)559 erts_thr_progress_register_managed_thread(ErtsSchedulerData *esdp,
560 ErtsThrPrgrCallbacks *callbacks,
561 int pref_wakeup,
562 int deep_sleeper)
563 {
564 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
565 int is_blocking = 0, managed;
566
567 if (tpd) {
568 if (!tpd->is_temporary)
569 erts_exit(ERTS_ABORT_EXIT,
570 "%s:%d:%s(): Double register of thread\n",
571 __FILE__, __LINE__, __func__);
572 is_blocking = tpd->is_blocking;
573 return_tmp_thr_prgr_data(tpd);
574 }
575
576 if (esdp)
577 tpd = &esdp->thr_progress_data;
578 else
579 tpd = erts_alloc(ERTS_ALC_T_THR_PRGR_DATA, sizeof(ErtsThrPrgrData));
580
581 if (pref_wakeup
582 && !erts_atomic32_xchg_nob(&intrnl->misc.data.pref_wakeup_used, 1))
583 tpd->id = 0;
584 else if (esdp)
585 tpd->id = (int) esdp->no;
586 else
587 tpd->id = erts_atomic32_inc_read_nob(&intrnl->misc.data.managed_id);
588 ASSERT(tpd->id >= 0);
589 if (tpd->id >= intrnl->managed.no)
590 erts_exit(ERTS_ABORT_EXIT,
591 "%s:%d:%s(): Too many managed registered threads\n",
592 __FILE__, __LINE__, __func__);
593
594 tpd->is_managed = 1;
595 tpd->is_blocking = is_blocking;
596 tpd->is_temporary = 0;
597 tpd->is_deep_sleeper = deep_sleeper;
598 #ifdef ERTS_ENABLE_LOCK_CHECK
599 tpd->is_delaying = 1;
600 #endif
601
602 init_wakeup_request_array(&tpd->wakeup_request[0]);
603
604 ERTS_THR_PROGRESS_STATE_DEBUG_INIT(tpd->id);
605
606 tpd->leader = 0;
607 tpd->active = 1;
608 tpd->confirmed = 0;
609 tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
610 erts_tsd_set(erts_thr_prgr_data_key__, (void *) tpd);
611
612 erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
613
614 ASSERT(callbacks->wakeup);
615 ASSERT(callbacks->prepare_wait);
616 ASSERT(callbacks->wait);
617 ASSERT(callbacks->finalize_wait);
618
619 intrnl->managed.callbacks[tpd->id] = *callbacks;
620
621 callbacks->prepare_wait(callbacks->arg);
622 managed = erts_atomic32_inc_read_relb(&intrnl->misc.data.managed_count);
623 if (managed != intrnl->managed.no) {
624 /* Wait until all managed threads have registered... */
625 do {
626 callbacks->wait(callbacks->arg);
627 callbacks->prepare_wait(callbacks->arg);
628 managed = erts_atomic32_read_acqb(&intrnl->misc.data.managed_count);
629 } while (managed != intrnl->managed.no);
630 }
631 else {
632 int id;
633 /* All managed threads have registered; lets go... */
634 for (id = 0; id < managed; id++)
635 if (id != tpd->id)
636 wakeup_managed(id);
637 }
638 callbacks->finalize_wait(callbacks->arg);
639 return tpd;
640 }
641
642 static ERTS_INLINE int
leader_update(ErtsThrPrgrData * tpd)643 leader_update(ErtsThrPrgrData *tpd)
644 {
645 #ifdef ERTS_ENABLE_LOCK_CHECK
646 erts_lc_check_exact(NULL, 0);
647 #endif
648 if (!tpd->leader) {
649 /* Probably need to block... */
650 block_thread(tpd);
651 }
652 else {
653 ErtsThrPrgrVal current;
654 int ix, chk_next_ix, umrefc_ix, my_ix, no_managed, waiting_unmanaged;
655 erts_aint32_t lflgs;
656 ErtsThrPrgrVal next;
657 erts_aint_t refc;
658
659 my_ix = tpd->id;
660
661 if (tpd->leader_state.current == ERTS_THR_PRGR_VAL_WAITING) {
662 /* Took over as leader from another thread */
663 tpd->leader_state.current = read_nob(&erts_thr_prgr__.current);
664 tpd->leader_state.next = tpd->leader_state.current;
665 tpd->leader_state.next++;
666 if (tpd->leader_state.next == ERTS_THR_PRGR_VAL_WAITING)
667 tpd->leader_state.next = 0;
668 tpd->leader_state.chk_next_ix = intrnl->misc.data.chk_next_ix;
669 tpd->leader_state.umrefc_ix.waiting = intrnl->misc.data.umrefc_ix.waiting;
670 tpd->leader_state.umrefc_ix.current =
671 (int) erts_atomic32_read_nob(&intrnl->misc.data.umrefc_ix.current);
672
673 if (tpd->confirmed == tpd->leader_state.current) {
674 ErtsThrPrgrVal val = tpd->leader_state.current + 1;
675 if (val == ERTS_THR_PRGR_VAL_WAITING)
676 val = 0;
677 tpd->confirmed = val;
678 set_mb(&intrnl->thr[my_ix].data.current, val);
679 }
680 }
681
682
683 next = tpd->leader_state.next;
684
685 waiting_unmanaged = 0;
686 umrefc_ix = -1; /* Shut up annoying warning */
687
688 chk_next_ix = tpd->leader_state.chk_next_ix;
689 no_managed = intrnl->managed.no;
690 ASSERT(0 <= chk_next_ix && chk_next_ix <= no_managed);
691 /* Check manged threads */
692 if (chk_next_ix < no_managed) {
693 for (ix = chk_next_ix; ix < no_managed; ix++) {
694 ErtsThrPrgrVal tmp;
695 if (ix == my_ix)
696 continue;
697 tmp = read_nob(&intrnl->thr[ix].data.current);
698 if (tmp != next && tmp != ERTS_THR_PRGR_VAL_WAITING) {
699 tpd->leader_state.chk_next_ix = ix;
700 ASSERT(erts_thr_progress_has_passed__(next, tmp));
701 goto done;
702 }
703 }
704 }
705
706 /* Check unmanged threads */
707 waiting_unmanaged = tpd->leader_state.umrefc_ix.waiting != -1;
708 umrefc_ix = (waiting_unmanaged
709 ? tpd->leader_state.umrefc_ix.waiting
710 : tpd->leader_state.umrefc_ix.current);
711 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
712 ASSERT(refc >= 0);
713 if (refc != 0) {
714 int new_umrefc_ix;
715
716 if (waiting_unmanaged)
717 goto done;
718
719 new_umrefc_ix = (umrefc_ix + 1) & 0x1;
720 tpd->leader_state.umrefc_ix.waiting = umrefc_ix;
721 tpd->leader_state.chk_next_ix = no_managed;
722 erts_atomic32_set_nob(&intrnl->misc.data.umrefc_ix.current,
723 (erts_aint32_t) new_umrefc_ix);
724 tpd->leader_state.umrefc_ix.current = new_umrefc_ix;
725 ETHR_MEMBAR(ETHR_StoreLoad);
726 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
727 ASSERT(refc >= 0);
728 waiting_unmanaged = 1;
729 if (refc != 0)
730 goto done;
731 }
732
733 /* Make progress */
734 current = next;
735
736 next++;
737 if (next == ERTS_THR_PRGR_VAL_WAITING)
738 next = 0;
739
740 set_nob(&intrnl->thr[my_ix].data.current, next);
741 set_mb(&erts_thr_prgr__.current, current);
742 tpd->confirmed = next;
743 tpd->leader_state.next = next;
744 tpd->leader_state.current = current;
745
746 #if ERTS_THR_PRGR_PRINT_VAL
747 if (current % 1000 == 0)
748 erts_fprintf(stderr, "%b64u\n", current);
749 #endif
750 handle_wakeup_requests(current);
751
752 if (waiting_unmanaged) {
753 waiting_unmanaged = 0;
754 tpd->leader_state.umrefc_ix.waiting = -1;
755 erts_atomic32_read_band_nob(&intrnl->misc.data.lflgs,
756 ~ERTS_THR_PRGR_LFLG_WAITING_UM);
757 }
758 tpd->leader_state.chk_next_ix = 0;
759
760 done:
761
762 if (tpd->active) {
763 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
764 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
765 (void) block_thread(tpd);
766 }
767 else {
768 erts_aint32_t set_flags = ERTS_THR_PRGR_LFLG_NO_LEADER;
769 tpd->leader = 0;
770 tpd->leader_state.current = ERTS_THR_PRGR_VAL_WAITING;
771 #if ERTS_THR_PRGR_PRINT_LEADER
772 erts_fprintf(stderr, "L <- %d\n", tpd->id);
773 #endif
774
775 ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 0);
776
777 intrnl->misc.data.umrefc_ix.waiting
778 = tpd->leader_state.umrefc_ix.waiting;
779 if (waiting_unmanaged)
780 set_flags |= ERTS_THR_PRGR_LFLG_WAITING_UM;
781
782 lflgs = erts_atomic32_read_bor_relb(&intrnl->misc.data.lflgs,
783 set_flags);
784 lflgs |= set_flags;
785 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
786 lflgs = block_thread(tpd);
787
788 if (waiting_unmanaged) {
789 /* Need to check umrefc again */
790 ETHR_MEMBAR(ETHR_StoreLoad);
791 refc = erts_atomic_read_nob(&intrnl->umrefc[umrefc_ix].refc);
792 if (refc == 0 && got_sched_wakeups()) {
793 /* Someone need to make progress */
794 wakeup_managed(tpd->id);
795 }
796 }
797 }
798 }
799
800 return tpd->leader;
801 }
802
803 static int
update(ErtsThrPrgrData * tpd)804 update(ErtsThrPrgrData *tpd)
805 {
806 int res;
807 ErtsThrPrgrVal val;
808
809 if (tpd->leader)
810 res = 1;
811 else {
812 erts_aint32_t lflgs;
813 res = 0;
814 val = read_acqb(&erts_thr_prgr__.current);
815 if (tpd->confirmed == val) {
816 val++;
817 if (val == ERTS_THR_PRGR_VAL_WAITING)
818 val = 0;
819 tpd->confirmed = val;
820 set_mb(&intrnl->thr[tpd->id].data.current, val);
821 }
822
823 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
824 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
825 res = 1; /* Need to block in leader_update() */
826
827 if ((lflgs & ERTS_THR_PRGR_LFLG_NO_LEADER)
828 && (tpd->active || ERTS_THR_PRGR_LFLGS_ACTIVE(lflgs) == 0)) {
829 /* Try to take over leadership... */
830 erts_aint32_t olflgs;
831 olflgs = erts_atomic32_read_band_acqb(
832 &intrnl->misc.data.lflgs,
833 ~ERTS_THR_PRGR_LFLG_NO_LEADER);
834 if (olflgs & ERTS_THR_PRGR_LFLG_NO_LEADER) {
835 tpd->leader = 1;
836 #if ERTS_THR_PRGR_PRINT_LEADER
837 erts_fprintf(stderr, "L -> %d\n", tpd->id);
838 #endif
839 ERTS_THR_PROGRESS_STATE_DEBUG_SET_LEADER(tpd->id, 1);
840 }
841 }
842 res |= tpd->leader;
843 }
844 return res;
845 }
846
847 int
erts_thr_progress_update(ErtsThrPrgrData * tpd)848 erts_thr_progress_update(ErtsThrPrgrData *tpd)
849 {
850 return update(tpd);
851 }
852
853
854 int
erts_thr_progress_leader_update(ErtsThrPrgrData * tpd)855 erts_thr_progress_leader_update(ErtsThrPrgrData *tpd)
856 {
857 return leader_update(tpd);
858 }
859
860 void
erts_thr_progress_prepare_wait(ErtsThrPrgrData * tpd)861 erts_thr_progress_prepare_wait(ErtsThrPrgrData *tpd)
862 {
863 erts_aint32_t lflgs;
864
865 #ifdef ERTS_ENABLE_LOCK_CHECK
866 erts_lc_check_exact(NULL, 0);
867 #endif
868
869 block_count_dec();
870
871 tpd->confirmed = ERTS_THR_PRGR_VAL_WAITING;
872 set_mb(&intrnl->thr[tpd->id].data.current, ERTS_THR_PRGR_VAL_WAITING);
873
874 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
875
876 if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
877 | ERTS_THR_PRGR_LFLG_WAITING_UM
878 | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
879 == ERTS_THR_PRGR_LFLG_NO_LEADER
880 && got_sched_wakeups()) {
881 /* Someone need to make progress */
882 if (tpd->is_deep_sleeper)
883 wakeup_managed(1);
884 else
885 wakeup_managed(tpd->id);
886 }
887 }
888
889 void
erts_thr_progress_finalize_wait(ErtsThrPrgrData * tpd)890 erts_thr_progress_finalize_wait(ErtsThrPrgrData *tpd)
891 {
892 ErtsThrPrgrVal current, val;
893
894 #ifdef ERTS_ENABLE_LOCK_CHECK
895 erts_lc_check_exact(NULL, 0);
896 #endif
897
898 /*
899 * We aren't allowed to continue until our thread
900 * progress is past global current.
901 */
902 val = current = read_acqb(&erts_thr_prgr__.current);
903 while (1) {
904 val++;
905 if (val == ERTS_THR_PRGR_VAL_WAITING)
906 val = 0;
907 tpd->confirmed = val;
908 set_mb(&intrnl->thr[tpd->id].data.current, val);
909 val = read_acqb(&erts_thr_prgr__.current);
910 if (current == val)
911 break;
912 current = val;
913 }
914 if (block_count_inc())
915 block_thread(tpd);
916 if (update(tpd))
917 leader_update(tpd);
918 }
919
920 void
erts_thr_progress_active(ErtsThrPrgrData * tpd,int on)921 erts_thr_progress_active(ErtsThrPrgrData *tpd, int on)
922 {
923
924 #ifdef ERTS_ENABLE_LOCK_CHECK
925 erts_lc_check_exact(NULL, 0);
926 #endif
927
928 ERTS_THR_PROGRESS_STATE_DEBUG_SET_ACTIVE(tpd->id, on);
929
930 if (on) {
931 ASSERT(!tpd->active);
932 tpd->active = 1;
933 erts_atomic32_inc_nob(&intrnl->misc.data.lflgs);
934 }
935 else {
936 ASSERT(tpd->active);
937 tpd->active = 0;
938 erts_atomic32_dec_nob(&intrnl->misc.data.lflgs);
939 if (update(tpd))
940 leader_update(tpd);
941 }
942
943 #ifdef DEBUG
944 {
945 erts_aint32_t n = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
946 n &= ERTS_THR_PRGR_LFLG_ACTIVE_MASK;
947 ASSERT(tpd->active <= n && n <= intrnl->managed.no);
948 }
949 #endif
950
951 }
952
953 static ERTS_INLINE void
unmanaged_continue(ErtsThrPrgrDelayHandle handle)954 unmanaged_continue(ErtsThrPrgrDelayHandle handle)
955 {
956 int umrefc_ix = (int) handle;
957 erts_aint_t refc;
958
959 ASSERT(umrefc_ix == 0 || umrefc_ix == 1);
960 refc = erts_atomic_dec_read_relb(&intrnl->umrefc[umrefc_ix].refc);
961 ASSERT(refc >= 0);
962 if (refc == 0) {
963 erts_aint_t lflgs;
964 ERTS_THR_READ_MEMORY_BARRIER;
965 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
966 if ((lflgs & (ERTS_THR_PRGR_LFLG_NO_LEADER
967 | ERTS_THR_PRGR_LFLG_WAITING_UM
968 | ERTS_THR_PRGR_LFLG_ACTIVE_MASK))
969 == (ERTS_THR_PRGR_LFLG_NO_LEADER|ERTS_THR_PRGR_LFLG_WAITING_UM)
970 && got_sched_wakeups()) {
971 /* Others waiting for us... */
972 wakeup_managed(1);
973 }
974 }
975 }
976
977 void
erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)978 erts_thr_progress_unmanaged_continue__(ErtsThrPrgrDelayHandle handle)
979 {
980 #ifdef ERTS_ENABLE_LOCK_CHECK
981 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
982 ERTS_LC_ASSERT(tpd && tpd->is_delaying);
983 tpd->is_delaying--;
984 ASSERT(tpd->is_delaying >= 0);
985 if (!tpd->is_delaying)
986 return_tmp_thr_prgr_data(tpd);
987 #endif
988 ASSERT(!erts_thr_progress_is_managed_thread());
989
990 unmanaged_continue(handle);
991 }
992
993 ErtsThrPrgrDelayHandle
erts_thr_progress_unmanaged_delay__(void)994 erts_thr_progress_unmanaged_delay__(void)
995 {
996 int umrefc_ix;
997 ASSERT(!erts_thr_progress_is_managed_thread());
998 umrefc_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
999 while (1) {
1000 int tmp_ix;
1001 erts_atomic_inc_acqb(&intrnl->umrefc[umrefc_ix].refc);
1002 tmp_ix = (int) erts_atomic32_read_acqb(&intrnl->misc.data.umrefc_ix.current);
1003 if (tmp_ix == umrefc_ix)
1004 break;
1005 unmanaged_continue(umrefc_ix);
1006 umrefc_ix = tmp_ix;
1007 }
1008 #ifdef ERTS_ENABLE_LOCK_CHECK
1009 {
1010 ErtsThrPrgrData *tpd = tmp_thr_prgr_data(NULL);
1011 tpd->is_delaying++;
1012 }
1013 #endif
1014 return (ErtsThrPrgrDelayHandle) umrefc_ix;
1015 }
1016
1017 static ERTS_INLINE int
has_reached_wakeup(ErtsThrPrgrVal wakeup)1018 has_reached_wakeup(ErtsThrPrgrVal wakeup)
1019 {
1020 /*
1021 * Exactly the same as erts_thr_progress_has_reached(), but
1022 * also verify valid wakeup requests in debug mode.
1023 */
1024 ErtsThrPrgrVal current;
1025
1026 current = read_acqb(&erts_thr_prgr__.current);
1027
1028 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1029 {
1030 ErtsThrPrgrVal limit;
1031 /*
1032 * erts_thr_progress_later() returns values which are
1033 * equal to 'current + 2', or 'current + 3'. That is, users
1034 * should never get a hold of values larger than that.
1035 *
1036 * That is, valid values are values less than 'current + 4'.
1037 *
1038 * Values larger than this won't work with the wakeup
1039 * algorithm.
1040 */
1041
1042 limit = current + 4;
1043 if (limit == ERTS_THR_PRGR_VAL_WAITING)
1044 limit = 0;
1045 else if (limit < current) /* Wrapped */
1046 limit += 1;
1047
1048 if (!erts_thr_progress_has_passed__(limit, wakeup))
1049 erts_exit(ERTS_ABORT_EXIT,
1050 "Invalid wakeup request value found:"
1051 " current=%b64u, wakeup=%b64u, limit=%b64u",
1052 current, wakeup, limit);
1053 }
1054 #endif
1055
1056 if (current == wakeup)
1057 return 1;
1058 return erts_thr_progress_has_passed__(current, wakeup);
1059 }
1060
1061 static void
request_wakeup_managed(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1062 request_wakeup_managed(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1063 {
1064 ErtsThrPrgrManagedWakeupData *mwd;
1065 int ix, wix;
1066
1067 /*
1068 * Only managed threads that aren't in waiting state
1069 * and aren't deep sleepers are allowed to call this
1070 * function.
1071 */
1072
1073 ASSERT(tpd->is_managed);
1074 ASSERT(tpd->confirmed != ERTS_THR_PRGR_VAL_WAITING);
1075 ASSERT(!tpd->is_deep_sleeper);
1076
1077 if (has_reached_wakeup(value)) {
1078 wakeup_managed(tpd->id);
1079 return;
1080 }
1081
1082 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1083 if (tpd->wakeup_request[wix] == value)
1084 return; /* Already got a request registered */
1085
1086 ASSERT(erts_thr_progress_has_passed__(value,
1087 tpd->wakeup_request[wix]));
1088
1089
1090 if (tpd->confirmed == value) {
1091 /*
1092 * We have already confirmed this value. We need to request
1093 * wakeup for a value later than our latest confirmed value in
1094 * order to prevent progress from reaching the requested value
1095 * while we are writing the request.
1096 *
1097 * It is ok to move the wakeup request forward since the only
1098 * guarantee we make (and can make) is that the thread will be
1099 * woken some time *after* the requested value has been reached.
1100 */
1101 value++;
1102 if (value == ERTS_THR_PRGR_VAL_WAITING)
1103 value = 0;
1104
1105 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1106 if (tpd->wakeup_request[wix] == value)
1107 return; /* Already got a request registered */
1108
1109 ASSERT(erts_thr_progress_has_passed__(value,
1110 tpd->wakeup_request[wix]));
1111 }
1112
1113 tpd->wakeup_request[wix] = value;
1114
1115 mwd = intrnl->managed.data[wix];
1116
1117 ix = erts_atomic32_inc_read_nob(&mwd->len) - 1;
1118 #if ERTS_THR_PRGR_DBG_CHK_WAKEUP_REQUEST_VALUE
1119 if (ix >= intrnl->managed.no)
1120 erts_exit(ERTS_ABORT_EXIT, "Internal error: Too many wakeup requests\n");
1121 #endif
1122 mwd->id[ix] = tpd->id;
1123
1124 ASSERT(!erts_thr_progress_has_reached(value));
1125
1126 /*
1127 * This thread is guarranteed to issue a full memory barrier:
1128 * - after the request has been written, but
1129 * - before the global thread progress reach the (possibly
1130 * increased) requested wakeup value.
1131 */
1132 }
1133
1134 static void
request_wakeup_unmanaged(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1135 request_wakeup_unmanaged(ErtsThrPrgrData *tpd, ErtsThrPrgrVal value)
1136 {
1137 int wix, ix, id, bit;
1138 ErtsThrPrgrUnmanagedWakeupData *umwd;
1139
1140 ASSERT(!tpd->is_managed);
1141
1142 /*
1143 * Thread progress *can* reach and pass our requested value while
1144 * we are writing the request.
1145 */
1146
1147 if (has_reached_wakeup(value)) {
1148 wakeup_unmanaged(tpd->id);
1149 return;
1150 }
1151
1152 wix = ERTS_THR_PRGR_WAKEUP_IX(value);
1153
1154 if (tpd->wakeup_request[wix] == value)
1155 return; /* Already got a request registered */
1156
1157 ASSERT(erts_thr_progress_has_passed__(value,
1158 tpd->wakeup_request[wix]));
1159
1160 umwd = intrnl->unmanaged.data[wix];
1161
1162 id = tpd->id;
1163
1164 bit = id & ERTS_THR_PRGR_BM_MASK;
1165 ix = id >> ERTS_THR_PRGR_BM_SHIFT;
1166 ASSERT(0 <= ix && ix < umwd->low_sz);
1167 erts_atomic32_read_bor_nob(&umwd->low[ix], 1 << bit);
1168
1169 bit = ix & ERTS_THR_PRGR_BM_MASK;
1170 ix >>= ERTS_THR_PRGR_BM_SHIFT;
1171 ASSERT(0 <= ix && ix < umwd->high_sz);
1172 erts_atomic32_read_bor_nob(&umwd->high[ix], 1 << bit);
1173
1174 erts_atomic32_inc_mb(&umwd->len);
1175
1176 if (erts_thr_progress_has_reached(value))
1177 wakeup_unmanaged(tpd->id);
1178 else
1179 tpd->wakeup_request[wix] = value;
1180 }
1181
1182 void
erts_thr_progress_wakeup(ErtsThrPrgrData * tpd,ErtsThrPrgrVal value)1183 erts_thr_progress_wakeup(ErtsThrPrgrData *tpd,
1184 ErtsThrPrgrVal value)
1185 {
1186
1187 ASSERT(!tpd->is_temporary);
1188 if (tpd->is_managed)
1189 request_wakeup_managed(tpd, value);
1190 else
1191 request_wakeup_unmanaged(tpd, value);
1192 }
1193
1194 static void
wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData * umwd)1195 wakeup_unmanaged_threads(ErtsThrPrgrUnmanagedWakeupData *umwd)
1196 {
1197 int hix;
1198 for (hix = 0; hix < umwd->high_sz; hix++) {
1199 erts_aint32_t hmask = erts_atomic32_read_nob(&umwd->high[hix]);
1200 if (hmask) {
1201 int hbase = hix << ERTS_THR_PRGR_BM_SHIFT;
1202 int hbit;
1203 for (hbit = 0; hbit < ERTS_THR_PRGR_BM_BITS; hbit++) {
1204 if (hmask & (1U << hbit)) {
1205 erts_aint_t lmask;
1206 int lix = hbase + hbit;
1207 ASSERT(0 <= lix && lix < umwd->low_sz);
1208 lmask = erts_atomic32_read_nob(&umwd->low[lix]);
1209 if (lmask) {
1210 int lbase = lix << ERTS_THR_PRGR_BM_SHIFT;
1211 int lbit;
1212 for (lbit = 0; lbit < ERTS_THR_PRGR_BM_BITS; lbit++) {
1213 if (lmask & (1U << lbit)) {
1214 int id = lbase + lbit;
1215 wakeup_unmanaged(id);
1216 }
1217 }
1218 erts_atomic32_set_nob(&umwd->low[lix], 0);
1219 }
1220 }
1221 }
1222 erts_atomic32_set_nob(&umwd->high[hix], 0);
1223 }
1224 }
1225 }
1226
1227
1228 static void
handle_wakeup_requests(ErtsThrPrgrVal current)1229 handle_wakeup_requests(ErtsThrPrgrVal current)
1230 {
1231 ErtsThrPrgrManagedWakeupData *mwd;
1232 ErtsThrPrgrUnmanagedWakeupData *umwd;
1233 int wix, len, i;
1234
1235 wix = ERTS_THR_PRGR_WAKEUP_IX(current);
1236
1237 mwd = intrnl->managed.data[wix];
1238 len = erts_atomic32_read_nob(&mwd->len);
1239 ASSERT(len >= 0);
1240 if (len) {
1241 for (i = 0; i < len; i++)
1242 wakeup_managed(mwd->id[i]);
1243 erts_atomic32_set_nob(&mwd->len, 0);
1244 }
1245
1246 umwd = intrnl->unmanaged.data[wix];
1247 len = erts_atomic32_read_nob(&umwd->len);
1248 ASSERT(len >= 0);
1249 if (len) {
1250 wakeup_unmanaged_threads(umwd);
1251 erts_atomic32_set_nob(&umwd->len, 0);
1252 }
1253
1254 }
1255
1256 static int
got_sched_wakeups(void)1257 got_sched_wakeups(void)
1258 {
1259 int wix;
1260
1261 ERTS_THR_MEMORY_BARRIER;
1262
1263 for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1264 ErtsThrPrgrManagedWakeupData **mwd = intrnl->managed.data;
1265 if (erts_atomic32_read_nob(&mwd[wix]->len))
1266 return 1;
1267 }
1268 for (wix = 0; wix < ERTS_THR_PRGR_WAKEUP_DATA_SIZE; wix++) {
1269 ErtsThrPrgrUnmanagedWakeupData **umwd = intrnl->unmanaged.data;
1270 if (erts_atomic32_read_nob(&umwd[wix]->len))
1271 return 1;
1272 }
1273 return 0;
1274 }
1275
1276 static erts_aint32_t
block_thread(ErtsThrPrgrData * tpd)1277 block_thread(ErtsThrPrgrData *tpd)
1278 {
1279 erts_aint32_t lflgs;
1280 ErtsThrPrgrCallbacks *cbp = &intrnl->managed.callbacks[tpd->id];
1281
1282 do {
1283 block_count_dec();
1284
1285 while (1) {
1286 cbp->prepare_wait(cbp->arg);
1287 lflgs = erts_atomic32_read_nob(&intrnl->misc.data.lflgs);
1288 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1289 cbp->wait(cbp->arg);
1290 else
1291 break;
1292 }
1293
1294 } while (block_count_inc());
1295
1296 cbp->finalize_wait(cbp->arg);
1297
1298 return lflgs;
1299 }
1300
1301 static erts_aint32_t
thr_progress_block(ErtsThrPrgrData * tpd,int wait)1302 thr_progress_block(ErtsThrPrgrData *tpd, int wait)
1303 {
1304 erts_tse_t *event = NULL; /* Remove erroneous warning... sigh... */
1305 erts_aint32_t lflgs, bc;
1306
1307 if (tpd->is_blocking++)
1308 return (erts_aint32_t) 0;
1309
1310 while (1) {
1311 lflgs = erts_atomic32_read_bor_nob(&intrnl->misc.data.lflgs,
1312 ERTS_THR_PRGR_LFLG_BLOCK);
1313 if (lflgs & ERTS_THR_PRGR_LFLG_BLOCK)
1314 block_thread(tpd);
1315 else
1316 break;
1317 }
1318
1319 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1320 erts_fprintf(stderr, "block(%d)\n", tpd->id);
1321 #endif
1322
1323 ASSERT(ERTS_AINT_NULL
1324 == erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1325
1326 if (wait) {
1327 event = erts_tse_fetch();
1328 erts_tse_reset(event);
1329 erts_atomic_set_nob(&intrnl->misc.data.blocker_event,
1330 (erts_aint_t) event);
1331 }
1332 if (tpd->is_managed)
1333 erts_atomic32_dec_nob(&intrnl->misc.data.block_count);
1334 bc = erts_atomic32_read_band_mb(&intrnl->misc.data.block_count,
1335 ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1336 bc &= ~ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING;
1337 if (wait) {
1338 while (bc != 0) {
1339 erts_tse_wait(event);
1340 erts_tse_reset(event);
1341 bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1342 }
1343 }
1344
1345 /* tse event returned in erts_thr_progress_unblock() */
1346 return bc;
1347
1348 }
1349
1350 void
erts_thr_progress_block(void)1351 erts_thr_progress_block(void)
1352 {
1353 thr_progress_block(tmp_thr_prgr_data(NULL), 1);
1354 }
1355
1356 int
erts_thr_progress_fatal_error_block(ErtsThrPrgrData * tmp_tpd_bufp)1357 erts_thr_progress_fatal_error_block(ErtsThrPrgrData *tmp_tpd_bufp)
1358 {
1359 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1360
1361 if (!tpd) {
1362 /*
1363 * We stack allocate since failure to allocate memory may
1364 * have caused the problem in the first place. This is ok
1365 * since we never complete an unblock after a fatal error
1366 * block.
1367 */
1368 tpd = tmp_tpd_bufp;
1369 init_tmp_thr_prgr_data(tpd);
1370 }
1371
1372 /* Returns number of threads that have not yes been blocked */
1373 return thr_progress_block(tpd, 0);
1374 }
1375
1376 void
erts_thr_progress_fatal_error_wait(SWord timeout)1377 erts_thr_progress_fatal_error_wait(SWord timeout) {
1378 erts_aint32_t bc;
1379 SWord time_left = timeout;
1380 ErtsMonotonicTime timeout_time;
1381 ErtsSchedulerData *esdp = erts_get_scheduler_data();
1382
1383 /*
1384 * Counting poll intervals may give us a too long timeout
1385 * if cpu is busy. We use timeout time to try to prevent
1386 * this. In case we havn't got time correction this may
1387 * however fail too...
1388 */
1389 timeout_time = erts_get_monotonic_time(esdp);
1390 timeout_time += ERTS_MSEC_TO_MONOTONIC((ErtsMonotonicTime) timeout);
1391
1392 while (1) {
1393 if (erts_milli_sleep(ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL) == 0)
1394 time_left -= ERTS_THR_PRGR_FTL_ERR_BLCK_POLL_INTERVAL;
1395 bc = erts_atomic32_read_acqb(&intrnl->misc.data.block_count);
1396 if (bc == 0)
1397 break; /* Succefully blocked all managed threads */
1398 if (time_left <= 0)
1399 break; /* Timeout */
1400 if (timeout_time <= erts_get_monotonic_time(esdp))
1401 break; /* Timeout */
1402 }
1403 }
1404
1405 void
erts_thr_progress_unblock(void)1406 erts_thr_progress_unblock(void)
1407 {
1408 erts_tse_t *event;
1409 int id, break_id, sz, wakeup;
1410 ErtsThrPrgrData *tpd = thr_prgr_data(NULL);
1411
1412 ASSERT(tpd->is_blocking);
1413 if (--tpd->is_blocking)
1414 return;
1415
1416 sz = intrnl->managed.no;
1417
1418 wakeup = 1;
1419 if (!tpd->is_managed)
1420 id = break_id = tpd->id < 0 ? 0 : tpd->id % sz;
1421 else {
1422 break_id = tpd->id;
1423 id = break_id + 1;
1424 if (id >= sz)
1425 id = 0;
1426 if (id == break_id)
1427 wakeup = 0;
1428 erts_atomic32_inc_nob(&intrnl->misc.data.block_count);
1429 }
1430
1431 event = ((erts_tse_t *)
1432 erts_atomic_read_nob(&intrnl->misc.data.blocker_event));
1433 ASSERT(event);
1434 erts_atomic_set_nob(&intrnl->misc.data.blocker_event, ERTS_AINT_NULL);
1435
1436 erts_atomic32_read_bor_relb(&intrnl->misc.data.block_count,
1437 ERTS_THR_PRGR_BC_FLG_NOT_BLOCKING);
1438 #if ERTS_THR_PRGR_PRINT_BLOCKERS
1439 erts_fprintf(stderr, "unblock(%d)\n", tpd->id);
1440 #endif
1441 erts_atomic32_read_band_mb(&intrnl->misc.data.lflgs,
1442 ~ERTS_THR_PRGR_LFLG_BLOCK);
1443
1444 if (wakeup) {
1445 do {
1446 ErtsThrPrgrVal tmp;
1447 tmp = read_nob(&intrnl->thr[id].data.current);
1448 if (tmp != ERTS_THR_PRGR_VAL_WAITING)
1449 wakeup_managed(id);
1450 if (++id >= sz)
1451 id = 0;
1452 } while (id != break_id);
1453 }
1454
1455 return_tmp_thr_prgr_data(tpd);
1456 erts_tse_return(event);
1457 }
1458
1459 int
erts_thr_progress_is_blocking(void)1460 erts_thr_progress_is_blocking(void)
1461 {
1462 ErtsThrPrgrData *tpd = perhaps_thr_prgr_data(NULL);
1463 return tpd && tpd->is_blocking;
1464 }
1465
erts_thr_progress_dbg_print_state(void)1466 void erts_thr_progress_dbg_print_state(void)
1467 {
1468 int id;
1469 int sz = intrnl->managed.no;
1470
1471 erts_fprintf(stderr, "--- thread progress ---\n");
1472 erts_fprintf(stderr,"current=%b64u\n", erts_thr_progress_current());
1473 for (id = 0; id < sz; id++) {
1474 ErtsThrPrgrVal current = read_nob(&intrnl->thr[id].data.current);
1475 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1476 erts_aint32_t state_debug;
1477 char *active, *leader;
1478
1479 state_debug = erts_atomic32_read_nob(&intrnl->thr[id].data.state_debug);
1480 active = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_ACTIVE
1481 ? "true"
1482 : "false");
1483 leader = (state_debug & ERTS_THR_PROGRESS_STATE_DEBUG_LEADER
1484 ? "true"
1485 : "false");
1486 #endif
1487 if (current == ERTS_THR_PRGR_VAL_WAITING)
1488 erts_fprintf(stderr,
1489 " id=%d, current=WAITING"
1490 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1491 ", active=%s, leader=%s"
1492 #endif
1493 "\n", id
1494 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1495 , active, leader
1496 #endif
1497 );
1498 else
1499 erts_fprintf(stderr,
1500 " id=%d, current=%b64u"
1501 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1502 ", active=%s, leader=%s"
1503 #endif
1504 "\n", id, current
1505 #ifdef ERTS_THR_PROGRESS_STATE_DEBUG
1506 , active, leader
1507 #endif
1508 );
1509 }
1510 erts_fprintf(stderr, "-----------------------\n");
1511
1512
1513 }
1514
1515