1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2000-2018. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20 #ifdef HAVE_CONFIG_H
21 # include "config.h"
22 #endif
23
24 #include "sys.h"
25 #include "erl_sys_driver.h"
26 #include "global.h"
27 #include "erl_threads.h"
28 #include "erl_thr_queue.h"
29 #include "erl_async.h"
30 #include "dtrace-wrapper.h"
31 #include "lttng-wrapper.h"
32
33 #define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20
34
35 #define ERTS_ASYNC_PRINT_JOB 0
36
37
38 typedef struct _erl_async {
39 DE_Handle* hndl; /* The DE_Handle is needed when port is gone */
40 Eterm port;
41 long async_id;
42 void* async_data;
43 ErlDrvPDL pdl;
44 void (*async_invoke)(void*);
45 void (*async_free)(void*);
46 Uint sched_id;
47 union {
48 ErtsThrQPrepEnQ_t *prep_enq;
49 ErtsThrQFinDeQ_t fin_deq;
50 } q;
51 } ErtsAsync;
52
53
54 /*
55 * We can do without the enqueue mutex since it isn't needed for
56 * thread safety. Its only purpose is to put async threads to sleep
57 * during a blast of ready async jobs. This in order to reduce
58 * contention on the enqueue end of the async ready queues. During
59 * such a blast without the enqueue mutex much cpu time is consumed
60 * by the async threads without them doing much progress which in turn
61 * slow down progress of scheduler threads.
62 */
63 #define ERTS_USE_ASYNC_READY_ENQ_MTX 1
64
65 #if ERTS_USE_ASYNC_READY_ENQ_MTX
66
67 typedef struct {
68 erts_mtx_t enq_mtx;
69 } ErtsAsyncReadyQXData;
70
71 #endif
72
73 typedef struct {
74 #if ERTS_USE_ASYNC_READY_ENQ_MTX
75 union {
76 ErtsAsyncReadyQXData data;
77 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
78 sizeof(ErtsAsyncReadyQXData))];
79 } x;
80 #endif
81 ErtsThrQ_t thr_q;
82 ErtsThrQFinDeQ_t fin_deq;
83 } ErtsAsyncReadyQ;
84
85
86 typedef union {
87 ErtsAsyncReadyQ arq;
88 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))];
89 } ErtsAlgndAsyncReadyQ;
90
91
92 typedef struct {
93 ErtsThrQ_t thr_q;
94 erts_tid_t thr_id;
95 } ErtsAsyncQ;
96
97 typedef union {
98 ErtsAsyncQ aq;
99 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))];
100 } ErtsAlgndAsyncQ;
101
102 typedef struct {
103 int no_initialized;
104 erts_mtx_t mtx;
105 erts_cnd_t cnd;
106 erts_atomic_t id;
107 } ErtsAsyncInit;
108
109 typedef struct {
110 union {
111 ErtsAsyncInit data;
112 char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))];
113 } init;
114 ErtsAlgndAsyncQ *queue;
115 ErtsAlgndAsyncReadyQ *ready_queue;
116 } ErtsAsyncData;
117
118 #if defined(USE_VM_PROBES)
119
120 /*
121 * Some compilers, e.g. GCC 4.2.1 and -O3, will optimize away DTrace
122 * calls if they're the last thing in the function. :-(
123 * Many thanks to Trond Norbye, via:
124 * https://github.com/memcached/memcached/commit/6298b3978687530bc9d219b6ac707a1b681b2a46
125 */
126 static unsigned gcc_optimizer_hack = 0;
127 #endif
128
129 int erts_async_max_threads; /* Initialized by erl_init.c */
130 int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */
131
132 static ErtsAsyncData *async;
133
134
135 static void *async_main(void *);
136
137 static ERTS_INLINE ErtsAsyncQ *
async_q(int i)138 async_q(int i)
139 {
140 ASSERT(async != NULL);
141 return &async->queue[i].aq;
142 }
143
144
145 static ERTS_INLINE ErtsAsyncReadyQ *
async_ready_q(Uint sched_id)146 async_ready_q(Uint sched_id)
147 {
148 return &async->ready_queue[((int)sched_id)-1].arq;
149 }
150
151
152 void
erts_init_async(void)153 erts_init_async(void)
154 {
155 async = NULL;
156 if (erts_async_max_threads > 0) {
157 ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
158 erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER;
159 char *ptr, thr_name[32];
160 size_t tot_size = 0;
161 int i;
162
163 tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
164 tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
165 tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
166
167 ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA,
168 tot_size);
169
170 async = (ErtsAsyncData *) ptr;
171 ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
172
173 async->init.data.no_initialized = 0;
174 erts_mtx_init(&async->init.data.mtx, "async_init_mtx", NIL,
175 ERTS_LOCK_FLAGS_CATEGORY_SCHEDULER);
176 erts_cnd_init(&async->init.data.cnd);
177 erts_atomic_init_nob(&async->init.data.id, 0);
178
179 async->queue = (ErtsAlgndAsyncQ *) ptr;
180 ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
181
182
183 qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
184 qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
185 qinit.notify = erts_notify_check_async_ready_queue;
186
187 async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr;
188 ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
189
190 for (i = 1; i <= erts_no_schedulers; i++) {
191 ErtsAsyncReadyQ *arq = async_ready_q(i);
192 #if ERTS_USE_ASYNC_READY_ENQ_MTX
193 erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx", make_small(i),
194 ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_SCHEDULER);
195 #endif
196 erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq);
197 qinit.arg = (void *) (SWord) i;
198 erts_thr_q_initialize(&arq->thr_q, &qinit);
199 }
200
201
202 /* Create async threads... */
203
204 thr_opts.detached = 0;
205 thr_opts.suggested_stack_size
206 = erts_async_thread_suggested_stack_size;
207
208 thr_opts.name = thr_name;
209
210 for (i = 0; i < erts_async_max_threads; i++) {
211 ErtsAsyncQ *aq = async_q(i);
212
213 erts_snprintf(thr_opts.name, sizeof(thr_name), "async_%d", i+1);
214
215 erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts);
216 }
217
218 /* Wait for async threads to initialize... */
219
220 erts_mtx_lock(&async->init.data.mtx);
221 while (async->init.data.no_initialized != erts_async_max_threads)
222 erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx);
223 erts_mtx_unlock(&async->init.data.mtx);
224
225 erts_mtx_destroy(&async->init.data.mtx);
226 erts_cnd_destroy(&async->init.data.cnd);
227
228 }
229 }
230
231
232 void *
erts_get_async_ready_queue(Uint sched_id)233 erts_get_async_ready_queue(Uint sched_id)
234 {
235 return (void *) async ? async_ready_q(sched_id) : NULL;
236 }
237
238
async_add(ErtsAsync * a,ErtsAsyncQ * q)239 static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q)
240 {
241 #ifdef USE_VM_PROBES
242 int len;
243 #endif
244
245 if (is_internal_port(a->port)) {
246 ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id);
247 a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q);
248 /* make sure the driver will stay around */
249 if (a->hndl)
250 erts_ddll_reference_referenced_driver(a->hndl);
251 }
252
253 #if ERTS_ASYNC_PRINT_JOB
254 erts_fprintf(stderr, "-> %ld\n", a->async_id);
255 #endif
256
257 erts_thr_q_enqueue(&q->thr_q, a);
258 }
259
async_get(ErtsThrQ_t * q,erts_tse_t * tse,ErtsThrQPrepEnQ_t ** prep_enq)260 static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q,
261 erts_tse_t *tse,
262 ErtsThrQPrepEnQ_t **prep_enq)
263 {
264 int saved_fin_deq = 0;
265 ErtsThrQFinDeQ_t fin_deq;
266 #ifdef USE_VM_PROBES
267 int len;
268 #endif
269
270 while (1) {
271 ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q);
272 if (a) {
273
274 *prep_enq = a->q.prep_enq;
275 erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq);
276 if (saved_fin_deq)
277 erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq);
278 return a;
279 }
280
281 if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) {
282 ErtsThrQFinDeQ_t tmp_fin_deq;
283
284 erts_tse_use(tse);
285 erts_tse_reset(tse);
286
287 chk_fin_deq:
288 if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) {
289 if (!saved_fin_deq) {
290 erts_thr_q_finalize_dequeue_state_init(&fin_deq);
291 saved_fin_deq = 1;
292 }
293 erts_thr_q_append_finalize_dequeue_data(&fin_deq,
294 &tmp_fin_deq);
295 }
296
297 switch (erts_thr_q_inspect(q, 1)) {
298 case ERTS_THR_Q_DIRTY:
299 break;
300 case ERTS_THR_Q_NEED_THR_PRGR:
301 {
302 ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q);
303 erts_thr_progress_wakeup(erts_thr_prgr_data(NULL), prgr);
304 /*
305 * We do no dequeue finalizing in hope that a new async
306 * job will arrive before we are woken due to thread
307 * progress...
308 */
309 erts_tse_wait(tse);
310 break;
311 }
312 case ERTS_THR_Q_CLEAN:
313
314 if (saved_fin_deq) {
315 if (erts_thr_q_finalize_dequeue(&fin_deq))
316 goto chk_fin_deq;
317 else
318 saved_fin_deq = 0;
319 }
320
321 erts_tse_wait(tse);
322 break;
323
324 default:
325 ASSERT(0);
326 break;
327 }
328
329 erts_tse_return(tse);
330 }
331 }
332 }
333
call_async_ready(ErtsAsync * a)334 static ERTS_INLINE void call_async_ready(ErtsAsync *a)
335 {
336 Port *p = erts_id2port_sflgs(a->port,
337 NULL,
338 0,
339 ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
340 if (!p) {
341 if (a->async_free) {
342 ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_PORT);
343 a->async_free(a->async_data);
344 ERTS_MSACC_POP_STATE();
345 }
346 }
347 else {
348 if (async_ready(p, a->async_data)) {
349 if (a->async_free) {
350 ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_PORT);
351 a->async_free(a->async_data);
352 ERTS_MSACC_POP_STATE();
353 }
354 }
355 erts_port_release(p);
356 }
357 if (a->pdl)
358 driver_pdl_dec_refc(a->pdl);
359 if (a->hndl)
360 erts_ddll_dereference_driver(a->hndl);
361 }
362
async_reply(ErtsAsync * a,ErtsThrQPrepEnQ_t * prep_enq)363 static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq)
364 {
365 ErtsAsyncReadyQ *arq;
366
367 #if ERTS_ASYNC_PRINT_JOB
368 erts_fprintf(stderr, "=>> %ld\n", a->async_id);
369 #endif
370
371 arq = async_ready_q(a->sched_id);
372
373 #if ERTS_USE_ASYNC_READY_ENQ_MTX
374 erts_mtx_lock(&arq->x.data.enq_mtx);
375 #endif
376
377 erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq);
378
379 #if ERTS_USE_ASYNC_READY_ENQ_MTX
380 erts_mtx_unlock(&arq->x.data.enq_mtx);
381 #endif
382
383 }
384
385
386 static void
async_wakeup(void * vtse)387 async_wakeup(void *vtse)
388 {
389 erts_tse_set((erts_tse_t *) vtse);
390 }
391
async_thread_init(ErtsAsyncQ * aq)392 static erts_tse_t *async_thread_init(ErtsAsyncQ *aq)
393 {
394 ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
395 erts_tse_t *tse = erts_tse_fetch();
396 ERTS_DECLARE_DUMMY(Uint no);
397 ErtsThrPrgrCallbacks callbacks;
398
399 erts_tse_return(tse);
400
401 callbacks.arg = (void *) tse;
402 callbacks.wakeup = async_wakeup;
403 callbacks.prepare_wait = NULL;
404 callbacks.wait = NULL;
405
406 erts_thr_progress_register_unmanaged_thread(&callbacks);
407
408 qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
409 qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
410 qinit.arg = (void *) tse;
411 qinit.notify = async_wakeup;
412 qinit.auto_finalize_dequeue = 0;
413
414 erts_thr_q_initialize(&aq->thr_q, &qinit);
415
416 /* Inform main thread that we are done initializing... */
417 erts_mtx_lock(&async->init.data.mtx);
418 no = async->init.data.no_initialized++;
419 erts_cnd_signal(&async->init.data.cnd);
420 erts_mtx_unlock(&async->init.data.mtx);
421
422 erts_msacc_init_thread("async", no, 0);
423
424 return tse;
425 }
426
async_main(void * arg)427 static void *async_main(void* arg)
428 {
429 ErtsAsyncQ *aq = (ErtsAsyncQ *) arg;
430 erts_tse_t *tse = async_thread_init(aq);
431 ERTS_MSACC_DECLARE_CACHE();
432
433 while (1) {
434 ErtsThrQPrepEnQ_t *prep_enq;
435 ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq);
436 if (is_nil(a->port))
437 break; /* Time to die */
438
439 ERTS_MSACC_UPDATE_CACHE();
440
441 #if ERTS_ASYNC_PRINT_JOB
442 erts_fprintf(stderr, "<- %ld\n", a->async_id);
443 #endif
444 ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
445 a->async_invoke(a->async_data);
446 ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER);
447
448 async_reply(a, prep_enq);
449 }
450
451 return NULL;
452 }
453
454
455 void
erts_exit_flush_async(void)456 erts_exit_flush_async(void)
457 {
458 int i;
459 ErtsAsync a;
460 a.port = NIL;
461 /*
462 * Terminate threads in order to flush queues. We do not
463 * bother to clean everything up since we are about to
464 * terminate the runtime system and a cleanup would only
465 * delay the termination.
466 */
467 for (i = 0; i < erts_async_max_threads; i++)
468 async_add(&a, async_q(i));
469 for (i = 0; i < erts_async_max_threads; i++)
470 erts_thr_join(async->queue[i].aq.thr_id, NULL);
471 }
472
erts_check_async_ready(void * varq)473 int erts_check_async_ready(void *varq)
474 {
475 ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
476 int res = 1;
477 int i;
478
479 for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) {
480 ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q);
481 if (!a) {
482 res = 0;
483 break;
484 }
485
486 #if ERTS_ASYNC_PRINT_JOB
487 erts_fprintf(stderr, "<<= %ld\n", a->async_id);
488 #endif
489 erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq);
490 call_async_ready(a);
491 erts_free(ERTS_ALC_T_ASYNC, (void *) a);
492 }
493
494 erts_thr_q_finalize_dequeue(&arq->fin_deq);
495
496 return res;
497 }
498
erts_async_ready_clean(void * varq,void * val)499 int erts_async_ready_clean(void *varq, void *val)
500 {
501 ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
502 ErtsThrQCleanState_t cstate;
503
504 cstate = erts_thr_q_clean(&arq->thr_q);
505
506 if (erts_thr_q_finalize_dequeue(&arq->fin_deq))
507 return ERTS_ASYNC_READY_DIRTY;
508
509 switch (cstate) {
510 case ERTS_THR_Q_DIRTY:
511 return ERTS_ASYNC_READY_DIRTY;
512 case ERTS_THR_Q_NEED_THR_PRGR:
513 *((ErtsThrPrgrVal *) val)
514 = erts_thr_q_need_thr_progress(&arq->thr_q);
515 return ERTS_ASYNC_READY_NEED_THR_PRGR;
516 case ERTS_THR_Q_CLEAN:
517 break;
518 }
519 return ERTS_ASYNC_READY_CLEAN;
520 }
521
522
523 /*
524 ** Generate a fair async key prom an ErlDrvPort
525 ** The port data gives a fair distribution grom port pointer
526 ** to unsigned integer - to be used in key for driver_async below.
527 */
driver_async_port_key(ErlDrvPort port)528 unsigned int driver_async_port_key(ErlDrvPort port)
529 {
530 ErlDrvTermData td = driver_mk_port(port);
531 if (td == (ErlDrvTermData) NIL) {
532 return 0;
533 }
534 return (unsigned int) (UWord) internal_port_data(td);
535 }
536
537 /*
538 ** Schedule async_invoke on a worker thread
539 ** NOTE will be syncrounous when threads are unsupported
540 ** return values:
541 ** 0 completed
542 ** -1 error
543 ** N handle value
544 ** arguments:
545 ** ix driver index
546 ** key pointer to secedule queue (NULL means round robin)
547 ** async_invoke function to run in thread
548 ** async_data data to pass to invoke function
549 ** async_free function for relase async_data in case of failure
550 */
driver_async(ErlDrvPort ix,unsigned int * key,void (* async_invoke)(void *),void * async_data,void (* async_free)(void *))551 long driver_async(ErlDrvPort ix, unsigned int* key,
552 void (*async_invoke)(void*), void* async_data,
553 void (*async_free)(void*))
554 {
555 ErtsAsync* a;
556 Port* prt;
557 long id;
558 unsigned int qix;
559 Uint sched_id;
560 ERTS_MSACC_PUSH_STATE();
561
562 sched_id = erts_get_scheduler_id();
563 if (!sched_id)
564 sched_id = 1;
565
566 prt = erts_drvport2port(ix);
567 if (prt == ERTS_INVALID_ERL_DRV_PORT)
568 return -1;
569
570 ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
571
572 a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync));
573
574 a->sched_id = sched_id;
575 a->hndl = (DE_Handle*)prt->drv_ptr->handle;
576 a->port = prt->common.id;
577 a->pdl = NULL;
578 a->async_data = async_data;
579 a->async_invoke = async_invoke;
580 a->async_free = async_free;
581
582 if (!async)
583 id = 0;
584 else {
585 do {
586 id = erts_atomic_inc_read_nob(&async->init.data.id);
587 } while (id == 0);
588 if (id < 0)
589 id *= -1;
590 ASSERT(id > 0);
591 }
592
593 a->async_id = id;
594
595 if (key == NULL) {
596 qix = (erts_async_max_threads > 0)
597 ? (id % erts_async_max_threads) : 0;
598 }
599 else {
600 qix = (erts_async_max_threads > 0) ?
601 (*key % erts_async_max_threads) : 0;
602 *key = qix;
603 }
604 if (erts_async_max_threads > 0) {
605 if (prt->port_data_lock) {
606 driver_pdl_inc_refc(prt->port_data_lock);
607 a->pdl = prt->port_data_lock;
608 }
609 async_add(a, async_q(qix));
610 return id;
611 }
612
613 ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
614 (*a->async_invoke)(a->async_data);
615 ERTS_MSACC_POP_STATE();
616
617 if (async_ready(prt, a->async_data)) {
618 if (a->async_free != NULL) {
619 ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
620 (*a->async_free)(a->async_data);
621 ERTS_MSACC_POP_STATE();
622 }
623 }
624 erts_free(ERTS_ALC_T_ASYNC, (void *) a);
625
626 return id;
627 }
628