1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2000-2018. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 #ifdef HAVE_CONFIG_H
21 #  include "config.h"
22 #endif
23 
24 #include "sys.h"
25 #include "erl_sys_driver.h"
26 #include "global.h"
27 #include "erl_threads.h"
28 #include "erl_thr_queue.h"
29 #include "erl_async.h"
30 #include "dtrace-wrapper.h"
31 #include "lttng-wrapper.h"
32 
33 #define ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ 20
34 
35 #define ERTS_ASYNC_PRINT_JOB 0
36 
37 
38 typedef struct _erl_async {
39     DE_Handle*         hndl;   /* The DE_Handle is needed when port is gone */
40     Eterm              port;
41     long               async_id;
42     void*              async_data;
43     ErlDrvPDL          pdl;
44     void (*async_invoke)(void*);
45     void (*async_free)(void*);
46     Uint               sched_id;
47     union {
48 	ErtsThrQPrepEnQ_t *prep_enq;
49 	ErtsThrQFinDeQ_t   fin_deq;
50     } q;
51 } ErtsAsync;
52 
53 
54 /*
55  * We can do without the enqueue mutex since it isn't needed for
56  * thread safety. Its only purpose is to put async threads to sleep
57  * during a blast of ready async jobs. This in order to reduce
58  * contention on the enqueue end of the async ready queues. During
59  * such a blast without the enqueue mutex much cpu time is consumed
60  * by the async threads without them doing much progress which in turn
61  * slow down progress of scheduler threads.
62  */
63 #define ERTS_USE_ASYNC_READY_ENQ_MTX 1
64 
65 #if ERTS_USE_ASYNC_READY_ENQ_MTX
66 
67 typedef struct {
68     erts_mtx_t enq_mtx;
69 } ErtsAsyncReadyQXData;
70 
71 #endif
72 
73 typedef struct {
74 #if ERTS_USE_ASYNC_READY_ENQ_MTX
75     union {
76 	ErtsAsyncReadyQXData data;
77 	char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(
78 		sizeof(ErtsAsyncReadyQXData))];
79     } x;
80 #endif
81     ErtsThrQ_t thr_q;
82     ErtsThrQFinDeQ_t fin_deq;
83 } ErtsAsyncReadyQ;
84 
85 
86 typedef union {
87     ErtsAsyncReadyQ arq;
88     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncReadyQ))];
89 } ErtsAlgndAsyncReadyQ;
90 
91 
92 typedef struct {
93     ErtsThrQ_t thr_q;
94     erts_tid_t thr_id;
95 } ErtsAsyncQ;
96 
97 typedef union {
98     ErtsAsyncQ aq;
99     char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncQ))];
100 } ErtsAlgndAsyncQ;
101 
102 typedef struct {
103     int no_initialized;
104     erts_mtx_t mtx;
105     erts_cnd_t cnd;
106     erts_atomic_t id;
107 } ErtsAsyncInit;
108 
109 typedef struct {
110     union {
111 	ErtsAsyncInit data;
112 	char align__[ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncInit))];
113     } init;
114     ErtsAlgndAsyncQ *queue;
115     ErtsAlgndAsyncReadyQ *ready_queue;
116 } ErtsAsyncData;
117 
118 #if defined(USE_VM_PROBES)
119 
120 /*
121  * Some compilers, e.g. GCC 4.2.1 and -O3, will optimize away DTrace
122  * calls if they're the last thing in the function.  :-(
123  * Many thanks to Trond Norbye, via:
124  * https://github.com/memcached/memcached/commit/6298b3978687530bc9d219b6ac707a1b681b2a46
125  */
126 static unsigned gcc_optimizer_hack = 0;
127 #endif
128 
129 int erts_async_max_threads; /* Initialized by erl_init.c */
130 int erts_async_thread_suggested_stack_size; /* Initialized by erl_init.c */
131 
132 static ErtsAsyncData *async;
133 
134 
135 static void *async_main(void *);
136 
137 static ERTS_INLINE ErtsAsyncQ *
async_q(int i)138 async_q(int i)
139 {
140     ASSERT(async != NULL);
141     return &async->queue[i].aq;
142 }
143 
144 
145 static ERTS_INLINE ErtsAsyncReadyQ *
async_ready_q(Uint sched_id)146 async_ready_q(Uint sched_id)
147 {
148     return &async->ready_queue[((int)sched_id)-1].arq;
149 }
150 
151 
152 void
erts_init_async(void)153 erts_init_async(void)
154 {
155     async = NULL;
156     if (erts_async_max_threads > 0) {
157 	ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
158 	erts_thr_opts_t thr_opts = ERTS_THR_OPTS_DEFAULT_INITER;
159 	char *ptr, thr_name[32];
160 	size_t tot_size = 0;
161 	int i;
162 
163 	tot_size += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
164 	tot_size += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
165 	tot_size += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
166 
167 	ptr = erts_alloc_permanent_cache_aligned(ERTS_ALC_T_ASYNC_DATA,
168 						 tot_size);
169 
170 	async = (ErtsAsyncData *) ptr;
171 	ptr += ERTS_ALC_CACHE_LINE_ALIGN_SIZE(sizeof(ErtsAsyncData));
172 
173 	async->init.data.no_initialized = 0;
174         erts_mtx_init(&async->init.data.mtx, "async_init_mtx", NIL,
175             ERTS_LOCK_FLAGS_CATEGORY_SCHEDULER);
176 	erts_cnd_init(&async->init.data.cnd);
177 	erts_atomic_init_nob(&async->init.data.id, 0);
178 
179 	async->queue = (ErtsAlgndAsyncQ *) ptr;
180 	ptr += sizeof(ErtsAlgndAsyncQ)*erts_async_max_threads;
181 
182 
183 	qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
184 	qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
185 	qinit.notify = erts_notify_check_async_ready_queue;
186 
187 	async->ready_queue = (ErtsAlgndAsyncReadyQ *) ptr;
188 	ptr += sizeof(ErtsAlgndAsyncReadyQ)*erts_no_schedulers;
189 
190 	for (i = 1; i <= erts_no_schedulers; i++) {
191 	    ErtsAsyncReadyQ *arq = async_ready_q(i);
192 #if ERTS_USE_ASYNC_READY_ENQ_MTX
193             erts_mtx_init(&arq->x.data.enq_mtx, "async_enq_mtx", make_small(i),
194                 ERTS_LOCK_FLAGS_PROPERTY_STATIC | ERTS_LOCK_FLAGS_CATEGORY_SCHEDULER);
195 #endif
196 	    erts_thr_q_finalize_dequeue_state_init(&arq->fin_deq);
197 	    qinit.arg = (void *) (SWord) i;
198 	    erts_thr_q_initialize(&arq->thr_q, &qinit);
199 	}
200 
201 
202 	/* Create async threads... */
203 
204 	thr_opts.detached = 0;
205 	thr_opts.suggested_stack_size
206 	    = erts_async_thread_suggested_stack_size;
207 
208 	thr_opts.name = thr_name;
209 
210 	for (i = 0; i < erts_async_max_threads; i++) {
211 	    ErtsAsyncQ *aq = async_q(i);
212 
213             erts_snprintf(thr_opts.name, sizeof(thr_name), "async_%d", i+1);
214 
215 	    erts_thr_create(&aq->thr_id, async_main, (void*) aq, &thr_opts);
216 	}
217 
218 	/* Wait for async threads to initialize... */
219 
220 	erts_mtx_lock(&async->init.data.mtx);
221 	while (async->init.data.no_initialized != erts_async_max_threads)
222 	    erts_cnd_wait(&async->init.data.cnd, &async->init.data.mtx);
223 	erts_mtx_unlock(&async->init.data.mtx);
224 
225 	erts_mtx_destroy(&async->init.data.mtx);
226 	erts_cnd_destroy(&async->init.data.cnd);
227 
228     }
229 }
230 
231 
232 void *
erts_get_async_ready_queue(Uint sched_id)233 erts_get_async_ready_queue(Uint sched_id)
234 {
235     return (void *) async ? async_ready_q(sched_id) : NULL;
236 }
237 
238 
async_add(ErtsAsync * a,ErtsAsyncQ * q)239 static ERTS_INLINE void async_add(ErtsAsync *a, ErtsAsyncQ* q)
240 {
241 #ifdef USE_VM_PROBES
242     int len;
243 #endif
244 
245     if (is_internal_port(a->port)) {
246 	ErtsAsyncReadyQ *arq = async_ready_q(a->sched_id);
247 	a->q.prep_enq = erts_thr_q_prepare_enqueue(&arq->thr_q);
248 	/* make sure the driver will stay around */
249 	if (a->hndl)
250 	    erts_ddll_reference_referenced_driver(a->hndl);
251     }
252 
253 #if ERTS_ASYNC_PRINT_JOB
254     erts_fprintf(stderr, "-> %ld\n", a->async_id);
255 #endif
256 
257     erts_thr_q_enqueue(&q->thr_q, a);
258 }
259 
async_get(ErtsThrQ_t * q,erts_tse_t * tse,ErtsThrQPrepEnQ_t ** prep_enq)260 static ERTS_INLINE ErtsAsync *async_get(ErtsThrQ_t *q,
261 					erts_tse_t *tse,
262 					ErtsThrQPrepEnQ_t **prep_enq)
263 {
264     int saved_fin_deq = 0;
265     ErtsThrQFinDeQ_t fin_deq;
266 #ifdef USE_VM_PROBES
267     int len;
268 #endif
269 
270     while (1) {
271 	ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(q);
272 	if (a) {
273 
274 	    *prep_enq = a->q.prep_enq;
275 	    erts_thr_q_get_finalize_dequeue_data(q, &a->q.fin_deq);
276 	    if (saved_fin_deq)
277 		erts_thr_q_append_finalize_dequeue_data(&a->q.fin_deq, &fin_deq);
278 	    return a;
279 	}
280 
281 	if (ERTS_THR_Q_DIRTY != erts_thr_q_clean(q)) {
282 	    ErtsThrQFinDeQ_t tmp_fin_deq;
283 
284             erts_tse_use(tse);
285 	    erts_tse_reset(tse);
286 
287 	chk_fin_deq:
288 	    if (erts_thr_q_get_finalize_dequeue_data(q, &tmp_fin_deq)) {
289 		if (!saved_fin_deq) {
290 		    erts_thr_q_finalize_dequeue_state_init(&fin_deq);
291 		    saved_fin_deq = 1;
292 		}
293 		erts_thr_q_append_finalize_dequeue_data(&fin_deq,
294 							&tmp_fin_deq);
295 	    }
296 
297 	    switch (erts_thr_q_inspect(q, 1)) {
298 	    case ERTS_THR_Q_DIRTY:
299 		break;
300 	    case ERTS_THR_Q_NEED_THR_PRGR:
301 	    {
302 		ErtsThrPrgrVal prgr = erts_thr_q_need_thr_progress(q);
303 		erts_thr_progress_wakeup(erts_thr_prgr_data(NULL), prgr);
304 		/*
305 		 * We do no dequeue finalizing in hope that a new async
306 		 * job will arrive before we are woken due to thread
307 		 * progress...
308 		 */
309 		erts_tse_wait(tse);
310 		break;
311 	    }
312 	    case ERTS_THR_Q_CLEAN:
313 
314 		if (saved_fin_deq) {
315 		    if (erts_thr_q_finalize_dequeue(&fin_deq))
316 			goto chk_fin_deq;
317 		    else
318 			saved_fin_deq = 0;
319 		}
320 
321 		erts_tse_wait(tse);
322 		break;
323 
324 	    default:
325 		ASSERT(0);
326 		break;
327 	    }
328 
329             erts_tse_return(tse);
330 	}
331     }
332 }
333 
call_async_ready(ErtsAsync * a)334 static ERTS_INLINE void call_async_ready(ErtsAsync *a)
335 {
336     Port *p = erts_id2port_sflgs(a->port,
337 				 NULL,
338 				 0,
339 				 ERTS_PORT_SFLGS_INVALID_DRIVER_LOOKUP);
340     if (!p) {
341 	if (a->async_free) {
342             ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_PORT);
343 	    a->async_free(a->async_data);
344             ERTS_MSACC_POP_STATE();
345         }
346     }
347     else {
348 	if (async_ready(p, a->async_data)) {
349 	    if (a->async_free) {
350                 ERTS_MSACC_PUSH_AND_SET_STATE(ERTS_MSACC_STATE_PORT);
351 		a->async_free(a->async_data);
352                 ERTS_MSACC_POP_STATE();
353             }
354 	}
355 	erts_port_release(p);
356     }
357     if (a->pdl)
358 	driver_pdl_dec_refc(a->pdl);
359     if (a->hndl)
360 	erts_ddll_dereference_driver(a->hndl);
361 }
362 
async_reply(ErtsAsync * a,ErtsThrQPrepEnQ_t * prep_enq)363 static ERTS_INLINE void async_reply(ErtsAsync *a, ErtsThrQPrepEnQ_t *prep_enq)
364 {
365     ErtsAsyncReadyQ *arq;
366 
367 #if ERTS_ASYNC_PRINT_JOB
368     erts_fprintf(stderr, "=>> %ld\n", a->async_id);
369 #endif
370 
371     arq = async_ready_q(a->sched_id);
372 
373 #if ERTS_USE_ASYNC_READY_ENQ_MTX
374 	erts_mtx_lock(&arq->x.data.enq_mtx);
375 #endif
376 
377 	erts_thr_q_enqueue_prepared(&arq->thr_q, (void *) a, prep_enq);
378 
379 #if ERTS_USE_ASYNC_READY_ENQ_MTX
380 	erts_mtx_unlock(&arq->x.data.enq_mtx);
381 #endif
382 
383 }
384 
385 
386 static void
async_wakeup(void * vtse)387 async_wakeup(void *vtse)
388 {
389     erts_tse_set((erts_tse_t *) vtse);
390 }
391 
async_thread_init(ErtsAsyncQ * aq)392 static erts_tse_t *async_thread_init(ErtsAsyncQ *aq)
393 {
394     ErtsThrQInit_t qinit = ERTS_THR_Q_INIT_DEFAULT;
395     erts_tse_t *tse = erts_tse_fetch();
396     ERTS_DECLARE_DUMMY(Uint no);
397     ErtsThrPrgrCallbacks callbacks;
398 
399     erts_tse_return(tse);
400 
401     callbacks.arg = (void *) tse;
402     callbacks.wakeup = async_wakeup;
403     callbacks.prepare_wait = NULL;
404     callbacks.wait = NULL;
405 
406     erts_thr_progress_register_unmanaged_thread(&callbacks);
407 
408     qinit.live.queue = ERTS_THR_Q_LIVE_LONG;
409     qinit.live.objects = ERTS_THR_Q_LIVE_SHORT;
410     qinit.arg = (void *) tse;
411     qinit.notify = async_wakeup;
412     qinit.auto_finalize_dequeue = 0;
413 
414     erts_thr_q_initialize(&aq->thr_q, &qinit);
415 
416     /* Inform main thread that we are done initializing... */
417     erts_mtx_lock(&async->init.data.mtx);
418     no = async->init.data.no_initialized++;
419     erts_cnd_signal(&async->init.data.cnd);
420     erts_mtx_unlock(&async->init.data.mtx);
421 
422     erts_msacc_init_thread("async", no, 0);
423 
424     return tse;
425 }
426 
async_main(void * arg)427 static void *async_main(void* arg)
428 {
429     ErtsAsyncQ *aq = (ErtsAsyncQ *) arg;
430     erts_tse_t *tse = async_thread_init(aq);
431     ERTS_MSACC_DECLARE_CACHE();
432 
433     while (1) {
434 	ErtsThrQPrepEnQ_t *prep_enq;
435 	ErtsAsync *a = async_get(&aq->thr_q, tse, &prep_enq);
436 	if (is_nil(a->port))
437 	    break; /* Time to die */
438 
439         ERTS_MSACC_UPDATE_CACHE();
440 
441 #if ERTS_ASYNC_PRINT_JOB
442 	erts_fprintf(stderr, "<- %ld\n", a->async_id);
443 #endif
444         ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
445 	a->async_invoke(a->async_data);
446         ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_OTHER);
447 
448 	async_reply(a, prep_enq);
449     }
450 
451     return NULL;
452 }
453 
454 
455 void
erts_exit_flush_async(void)456 erts_exit_flush_async(void)
457 {
458     int i;
459     ErtsAsync a;
460     a.port = NIL;
461     /*
462      * Terminate threads in order to flush queues. We do not
463      * bother to clean everything up since we are about to
464      * terminate the runtime system and a cleanup would only
465      * delay the termination.
466      */
467     for (i = 0; i < erts_async_max_threads; i++)
468 	async_add(&a, async_q(i));
469     for (i = 0; i < erts_async_max_threads; i++)
470 	erts_thr_join(async->queue[i].aq.thr_id, NULL);
471 }
472 
erts_check_async_ready(void * varq)473 int erts_check_async_ready(void *varq)
474 {
475     ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
476     int res = 1;
477     int i;
478 
479     for (i = 0; i < ERTS_MAX_ASYNC_READY_CALLS_IN_SEQ; i++) {
480 	ErtsAsync *a = (ErtsAsync *) erts_thr_q_dequeue(&arq->thr_q);
481 	if (!a) {
482 	    res = 0;
483 	    break;
484 	}
485 
486 #if ERTS_ASYNC_PRINT_JOB
487 	erts_fprintf(stderr, "<<= %ld\n", a->async_id);
488 #endif
489 	erts_thr_q_append_finalize_dequeue_data(&arq->fin_deq, &a->q.fin_deq);
490 	call_async_ready(a);
491 	erts_free(ERTS_ALC_T_ASYNC, (void *) a);
492     }
493 
494     erts_thr_q_finalize_dequeue(&arq->fin_deq);
495 
496     return res;
497 }
498 
erts_async_ready_clean(void * varq,void * val)499 int erts_async_ready_clean(void *varq, void *val)
500 {
501     ErtsAsyncReadyQ *arq = (ErtsAsyncReadyQ *) varq;
502     ErtsThrQCleanState_t cstate;
503 
504     cstate = erts_thr_q_clean(&arq->thr_q);
505 
506     if (erts_thr_q_finalize_dequeue(&arq->fin_deq))
507 	return ERTS_ASYNC_READY_DIRTY;
508 
509     switch (cstate) {
510     case ERTS_THR_Q_DIRTY:
511 	return ERTS_ASYNC_READY_DIRTY;
512     case ERTS_THR_Q_NEED_THR_PRGR:
513 	*((ErtsThrPrgrVal *) val)
514 	    = erts_thr_q_need_thr_progress(&arq->thr_q);
515 	return ERTS_ASYNC_READY_NEED_THR_PRGR;
516     case ERTS_THR_Q_CLEAN:
517 	break;
518     }
519     return ERTS_ASYNC_READY_CLEAN;
520 }
521 
522 
523 /*
524 ** Generate a fair async key prom an ErlDrvPort
525 ** The port data gives a fair distribution grom port pointer
526 ** to unsigned integer - to be used in key for driver_async below.
527 */
driver_async_port_key(ErlDrvPort port)528 unsigned int driver_async_port_key(ErlDrvPort port)
529 {
530     ErlDrvTermData td = driver_mk_port(port);
531     if (td == (ErlDrvTermData) NIL) {
532 	return 0;
533     }
534     return (unsigned int) (UWord) internal_port_data(td);
535 }
536 
537 /*
538 ** Schedule async_invoke on a worker thread
539 ** NOTE will be syncrounous when threads are unsupported
540 ** return values:
541 **  0  completed
542 **  -1 error
543 **  N  handle value
544 **  arguments:
545 **      ix             driver index
546 **      key            pointer to secedule queue (NULL means round robin)
547 **      async_invoke   function to run in thread
548 **      async_data     data to pass to invoke function
549 **      async_free     function for relase async_data in case of failure
550 */
driver_async(ErlDrvPort ix,unsigned int * key,void (* async_invoke)(void *),void * async_data,void (* async_free)(void *))551 long driver_async(ErlDrvPort ix, unsigned int* key,
552 		  void (*async_invoke)(void*), void* async_data,
553 		  void (*async_free)(void*))
554 {
555     ErtsAsync* a;
556     Port* prt;
557     long id;
558     unsigned int qix;
559     Uint sched_id;
560     ERTS_MSACC_PUSH_STATE();
561 
562     sched_id = erts_get_scheduler_id();
563     if (!sched_id)
564 	sched_id = 1;
565 
566     prt = erts_drvport2port(ix);
567     if (prt == ERTS_INVALID_ERL_DRV_PORT)
568 	return -1;
569 
570     ERTS_LC_ASSERT(erts_lc_is_port_locked(prt));
571 
572     a = (ErtsAsync*) erts_alloc(ERTS_ALC_T_ASYNC, sizeof(ErtsAsync));
573 
574     a->sched_id = sched_id;
575     a->hndl = (DE_Handle*)prt->drv_ptr->handle;
576     a->port = prt->common.id;
577     a->pdl = NULL;
578     a->async_data = async_data;
579     a->async_invoke = async_invoke;
580     a->async_free = async_free;
581 
582     if (!async)
583 	id = 0;
584     else {
585 	do {
586 	    id = erts_atomic_inc_read_nob(&async->init.data.id);
587 	} while (id == 0);
588 	if (id < 0)
589 	    id *= -1;
590 	ASSERT(id > 0);
591     }
592 
593     a->async_id = id;
594 
595     if (key == NULL) {
596 	qix = (erts_async_max_threads > 0)
597 	    ? (id % erts_async_max_threads) : 0;
598     }
599     else {
600 	qix = (erts_async_max_threads > 0) ?
601 	    (*key % erts_async_max_threads) : 0;
602 	*key = qix;
603     }
604     if (erts_async_max_threads > 0) {
605 	if (prt->port_data_lock) {
606 	    driver_pdl_inc_refc(prt->port_data_lock);
607 	    a->pdl = prt->port_data_lock;
608 	}
609 	async_add(a, async_q(qix));
610 	return id;
611     }
612 
613     ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
614     (*a->async_invoke)(a->async_data);
615     ERTS_MSACC_POP_STATE();
616 
617     if (async_ready(prt, a->async_data)) {
618 	if (a->async_free != NULL) {
619             ERTS_MSACC_SET_STATE_CACHED(ERTS_MSACC_STATE_PORT);
620 	    (*a->async_free)(a->async_data);
621             ERTS_MSACC_POP_STATE();
622         }
623     }
624     erts_free(ERTS_ALC_T_ASYNC, (void *) a);
625 
626     return id;
627 }
628