1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2006-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description:	Scheduling of port tasks
23  *
24  * Author: 	Rickard Green
25  */
26 
27 #define ERL_PORT_TASK_C__
28 
29 #ifdef HAVE_CONFIG_H
30 #  include "config.h"
31 #endif
32 
33 #include "global.h"
34 #include "erl_port_task.h"
35 #include "dist.h"
36 #include "erl_check_io.h"
37 #include "dtrace-wrapper.h"
38 #include "lttng-wrapper.h"
39 #include "erl_check_io.h"
40 #include <stdarg.h>
41 
42 /*
43  * ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do...
44  */
45 #define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20)
46 
47 #if defined(DEBUG) && 0
48 #define ERTS_HARD_DEBUG_TASK_QUEUES
49 #else
50 #undef ERTS_HARD_DEBUG_TASK_QUEUES
51 #endif
52 
53 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
54 static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
55 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
56     chk_task_queues((PP), (EQ), (PBQ))
57 #else
58 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
59 #endif
60 
61 #ifdef USE_VM_PROBES
62 #define DTRACE_DRIVER(PROBE_NAME, PP)                              \
63     if (DTRACE_ENABLED(PROBE_NAME)) {                              \
64         DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);         \
65         DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);            \
66                                                                    \
67         dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str);  \
68         dtrace_port_str(PP, port_str);                             \
69         DTRACE3(PROBE_NAME, process_str, port_str, PP->name);      \
70     }
71 #else
72 #define  DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
73 #endif
74 #ifdef USE_LTTNG_VM_TRACEPOINTS
75 #define LTTNG_DRIVER(TRACEPOINT, PP)                              \
76     if (LTTNG_ENABLED(TRACEPOINT)) {                              \
77         lttng_decl_portbuf(port_str);                             \
78         lttng_decl_procbuf(proc_str);                             \
79         lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str);  \
80         lttng_port_to_str((PP), port_str);                        \
81         LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name);       \
82     }
83 #else
84 #define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
85 #endif
86 
87 #define ERTS_LC_VERIFY_RQ(RQ, PP)				\
88     do {							\
89 	ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));		\
90 	ERTS_LC_ASSERT((RQ) == erts_get_runq_port((PP)));       \
91     } while (0)
92 
93 #define ERTS_PT_STATE_SCHEDULED		0
94 #define ERTS_PT_STATE_ABORTED		1
95 #define ERTS_PT_STATE_EXECUTING		2
96 
97 typedef union {
98     struct { /* I/O tasks */
99 	ErlDrvEvent event;
100 #if ERTS_POLL_USE_SCHEDULER_POLLING
101         int is_scheduler_event;
102 #endif
103     } io;
104     struct {
105 	ErtsProc2PortSigCallback callback;
106 	ErtsProc2PortSigData data;
107     } psig;
108 } ErtsPortTaskTypeData;
109 
110 struct ErtsPortTask_ {
111     erts_atomic32_t state;
112     ErtsPortTaskType type;
113     union {
114 	struct {
115 	    ErtsPortTask *next;
116 	    ErtsPortTaskHandle *handle;
117 	    int flags;
118 	    Uint32 ref[ERTS_REF_NUMBERS];
119 	    ErtsPortTaskTypeData td;
120 	} alive;
121 	ErtsThrPrgrLaterOp release;
122     } u;
123 };
124 
125 struct ErtsPortTaskHandleList_ {
126     ErtsPortTaskHandle handle;
127     union {
128 	ErtsPortTaskHandleList *next;
129 	ErtsThrPrgrLaterOp release;
130     } u;
131 };
132 
133 typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
134 struct ErtsPortTaskBusyCaller_ {
135     ErtsPortTaskBusyCaller *next;
136     Eterm caller;
137     SWord count;
138     ErtsPortTask *last;
139 };
140 
141 #define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
142 struct ErtsPortTaskBusyCallerTable_ {
143     ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
144     ErtsPortTaskBusyCaller pre_alloc_busy_caller;
145 };
146 
147 #if ERTS_POLL_USE_SCHEDULER_POLLING
148 erts_atomic_t erts_port_task_outstanding_io_tasks;
149 #endif
150 
151 static void begin_port_cleanup(Port *pp,
152 			       ErtsPortTask **execq,
153 			       int *processing_busy_q_p);
154 
155 ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task,
156                                ErtsPortTask,
157                                1000,
158                                ERTS_ALC_T_PORT_TASK)
159 
160 ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
161 				 ErtsPortTaskBusyCallerTable,
162 				 50,
163 				 ERTS_ALC_T_BUSY_CALLER_TAB)
164 
165 static void
call_port_task_free(void * vptp)166 call_port_task_free(void *vptp)
167 {
168     port_task_free((ErtsPortTask *) vptp);
169 }
170 
171 static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask * ptp)172 schedule_port_task_free(ErtsPortTask *ptp)
173 {
174     erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
175 					    (void *) ptp,
176 					    &ptp->u.release,
177 					    sizeof(ErtsPortTask));
178 }
179 
180 static ERTS_INLINE ErtsPortTask *
p2p_sig_data_to_task(ErtsProc2PortSigData * sigdp)181 p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
182 {
183     ErtsPortTask *ptp;
184     char *ptr = (char *) sigdp;
185     ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
186     ptp = (ErtsPortTask *) ptr;
187     ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
188     return ptp;
189 }
190 
191 static ERTS_INLINE ErtsProc2PortSigData *
p2p_sig_data_init(ErtsPortTask * ptp)192 p2p_sig_data_init(ErtsPortTask *ptp)
193 {
194 
195     ptp->type = ERTS_PORT_TASK_PROC_SIG;
196     ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
197     erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
198 
199     ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
200 
201     return &ptp->u.alive.td.psig.data;
202 }
203 
204 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data(void)205 erts_port_task_alloc_p2p_sig_data(void)
206 {
207     ErtsPortTask *ptp = port_task_alloc();
208 
209     return p2p_sig_data_init(ptp);
210 }
211 
212 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data_extra(size_t extra,void ** extra_ptr)213 erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr)
214 {
215     ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK,
216                                    sizeof(ErtsPortTask) + extra);
217 
218     *extra_ptr = ptp+1;
219 
220     return p2p_sig_data_init(ptp);
221 }
222 
223 void
erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData * sigdp)224 erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp)
225 {
226     schedule_port_task_free(p2p_sig_data_to_task(sigdp));
227 }
228 
229 static ERTS_INLINE Eterm
task_caller(ErtsPortTask * ptp)230 task_caller(ErtsPortTask *ptp)
231 {
232     Eterm caller;
233 
234     ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
235 
236     caller = ptp->u.alive.td.psig.data.caller;
237 
238     ASSERT(is_internal_pid(caller) || is_internal_port(caller));
239 
240     return caller;
241 }
242 
243 /*
244  * Busy queue management
245  */
246 
247 static ERTS_INLINE int
caller2bix(Eterm caller)248 caller2bix(Eterm caller)
249 {
250     ASSERT(is_internal_pid(caller) || is_internal_port(caller));
251     return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
252 }
253 
254 
255 static void
popped_from_busy_queue(Port * pp,ErtsPortTask * ptp,int last)256 popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
257 {
258     ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
259     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
260     Eterm caller = task_caller(ptp);
261     int bix = caller2bix(caller);
262 
263     ASSERT(is_internal_pid(caller));
264 
265     ASSERT(tabp);
266     bcp = tabp->bucket[bix];
267     prev_bcpp = &tabp->bucket[bix];
268     ASSERT(bcp);
269     while (bcp->caller != caller) {
270 	prev_bcpp = &bcp->next;
271 	bcp = bcp->next;
272 	ASSERT(bcp);
273     }
274     ASSERT(bcp->count > 0);
275     if (--bcp->count != 0) {
276 	ASSERT(!last);
277     }
278     else {
279 	*prev_bcpp = bcp->next;
280 	if (bcp == &tabp->pre_alloc_busy_caller)
281 	    bcp->caller = am_undefined;
282 	else
283 	    erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
284 	if (last) {
285 #ifdef DEBUG
286 	    erts_aint32_t flags =
287 #endif
288 		erts_atomic32_read_band_nob(
289 		    &pp->sched.flags,
290 		    ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
291 	    ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
292 #ifdef DEBUG
293 	    for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
294 		ASSERT(!tabp->bucket[bix]);
295 	    }
296 #endif
297 	    busy_caller_table_free(tabp);
298 	    pp->sched.taskq.local.busy.first = NULL;
299 	    pp->sched.taskq.local.busy.last = NULL;
300 	    pp->sched.taskq.local.busy.table = NULL;
301 	}
302     }
303 }
304 
305 static void
busy_wait_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)306 busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
307 {
308     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
309     Eterm caller = task_caller(ptp);
310     ErtsPortTaskBusyCaller *bcp;
311     int bix;
312 
313     ASSERT(is_internal_pid(caller));
314     /*
315      * Port is busy and this task type needs to wait until not busy.
316      */
317 
318     ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
319 
320     ptp->u.alive.next = NULL;
321     if (pp->sched.taskq.local.busy.last) {
322 	ASSERT(pp->sched.taskq.local.busy.first);
323 	pp->sched.taskq.local.busy.last->u.alive.next = ptp;
324     }
325     else {
326 	int i;
327 #ifdef DEBUG
328 	erts_aint32_t flags;
329 #endif
330 	pp->sched.taskq.local.busy.first = ptp;
331 
332 #ifdef DEBUG
333 	flags =
334 #endif
335 	    erts_atomic32_read_bor_nob(&pp->sched.flags,
336 					   ERTS_PTS_FLG_HAVE_BUSY_TASKS);
337 	ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
338 
339 	ASSERT(!tabp);
340 
341 	tabp = busy_caller_table_alloc();
342 	pp->sched.taskq.local.busy.table = tabp;
343 	for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
344 	    tabp->bucket[i] = NULL;
345 	tabp->pre_alloc_busy_caller.caller = am_undefined;
346     }
347     pp->sched.taskq.local.busy.last = ptp;
348 
349     bix = caller2bix(caller);
350     ASSERT(tabp);
351     bcp = tabp->bucket[bix];
352 
353     while (bcp && bcp->caller != caller)
354 	bcp = bcp->next;
355 
356     if (bcp)
357 	bcp->count++;
358     else {
359 	if (tabp->pre_alloc_busy_caller.caller == am_undefined)
360 	    bcp = &tabp->pre_alloc_busy_caller;
361 	else
362 	    bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
363 			     sizeof(ErtsPortTaskBusyCaller));
364 	bcp->caller = caller;
365 	bcp->count = 1;
366 	bcp->next = tabp->bucket[bix];
367 	tabp->bucket[bix] = bcp;
368     }
369 
370     bcp->last = ptp;
371 }
372 
373 static ERTS_INLINE int
check_sig_dep_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)374 check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
375 {
376     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
377     ErtsPortTask *last_ptp;
378     ErtsPortTaskBusyCaller *bcp;
379     int bix;
380     Eterm caller;
381 
382     ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
383     ASSERT(pp->sched.taskq.local.busy.last);
384     ASSERT(tabp);
385 
386 
387     /*
388      * We are either not busy, or the task does not imply wait on busy port.
389      * However, due to the signaling order requirements the task might depend
390      * on other tasks in the busy queue.
391      */
392 
393     caller = task_caller(ptp);
394     bix = caller2bix(caller);
395     bcp = tabp->bucket[bix];
396     while (bcp && bcp->caller != caller)
397 	bcp = bcp->next;
398 
399     if (!bcp)
400 	return 0;
401 
402     /*
403      * There are other tasks that we depend on in the busy queue;
404      * move into busy queue.
405      */
406 
407     bcp->count++;
408     last_ptp = bcp->last;
409     ptp->u.alive.next = last_ptp->u.alive.next;
410     if (!ptp->u.alive.next) {
411 	ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
412 	pp->sched.taskq.local.busy.last = ptp;
413     }
414     last_ptp->u.alive.next = ptp;
415     bcp->last = ptp;
416 
417     return 1;
418 }
419 
420 static void
no_sig_dep_move_from_busyq(Port * pp)421 no_sig_dep_move_from_busyq(Port *pp)
422 {
423     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
424     ErtsPortTask *first_ptp, *last_ptp, *ptp;
425     ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
426 
427     /*
428      * Move tasks at the head of the busy queue that no longer
429      * have any dependencies to busy wait tasks into the ordinary
430      * queue.
431      */
432 
433     first_ptp = ptp = pp->sched.taskq.local.busy.first;
434 
435     ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
436     ASSERT(tabp);
437 
438     do {
439 	Eterm caller = task_caller(ptp);
440 
441 	if (!bcp || bcp->caller != caller) {
442 	    int bix = caller2bix(caller);
443 
444 	    prev_bcpp = &tabp->bucket[bix];
445 	    bcp = tabp->bucket[bix];
446 	    ASSERT(bcp);
447 	    while (bcp->caller != caller) {
448 		ASSERT(bcp);
449 		prev_bcpp = &bcp->next;
450 		bcp = bcp->next;
451 	    }
452 	}
453 
454 	ASSERT(bcp->caller == caller);
455 	ASSERT(bcp->count > 0);
456 
457 	if (--bcp->count == 0) {
458 	    *prev_bcpp = bcp->next;
459 	    if (bcp == &tabp->pre_alloc_busy_caller)
460 		bcp->caller = am_undefined;
461 	    else
462 		erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
463 	}
464 
465 	last_ptp = ptp;
466 	ptp = ptp->u.alive.next;
467     } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
468 
469     pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
470     if (!pp->sched.taskq.local.busy.first) {
471 #ifdef DEBUG
472 	int bix;
473 	erts_aint32_t flags =
474 #endif
475 	    erts_atomic32_read_band_nob(
476 		&pp->sched.flags,
477 		~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
478 	ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
479 #ifdef DEBUG
480 	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
481 	    ASSERT(!tabp->bucket[bix]);
482 	}
483 #endif
484 	busy_caller_table_free(tabp);
485 	pp->sched.taskq.local.busy.last = NULL;
486 	pp->sched.taskq.local.busy.table = NULL;
487     }
488     last_ptp->u.alive.next = pp->sched.taskq.local.first;
489     pp->sched.taskq.local.first = first_ptp;
490 }
491 
492 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
493 
494 static void
chk_task_queues(Port * pp,ErtsPortTask * execq,int processing_busy_queue)495 chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
496 {
497     Sint tot_count, tot_table_count;
498     int bix;
499     ErtsPortTask *ptp, *last;
500     ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
501     ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
502     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
503     ErtsPortTaskBusyCaller *bcp;
504 
505     if (!first) {
506 	ASSERT(!tabp);
507 	ASSERT(!pp->sched.taskq.local.busy.last);
508 	ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
509 	return;
510     }
511 
512     ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
513     ASSERT(tabp);
514 
515     tot_count = 0;
516     ptp = first;
517     while (ptp) {
518 	Sint count = 0;
519 	Eterm caller = task_caller(ptp);
520 	int bix = caller2bix(caller);
521 	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
522 	    if (bcp->caller == caller)
523 		break;
524 	ASSERT(bcp && bcp->caller == caller);
525 
526 	ASSERT(bcp->last);
527 	while (1) {
528 	    ErtsPortTask *ptp2;
529 
530 	    ASSERT(caller == task_caller(ptp));
531 	    count++;
532 	    tot_count++;
533 	    last = ptp;
534 
535 	    for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
536 		ASSERT(ptp != ptp2);
537 	    }
538 
539 	    if (ptp == bcp->last)
540 		break;
541 	    ptp = ptp->u.alive.next;
542 	}
543 
544 	ASSERT(count == bcp->count);
545 	ptp = ptp->u.alive.next;
546     }
547 
548     tot_table_count = 0;
549     for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
550 	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
551 	    tot_table_count += bcp->count;
552     }
553 
554     ASSERT(tot_count == tot_table_count);
555 
556     ASSERT(last == pp->sched.taskq.local.busy.last);
557 }
558 
559 #endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
560 
561 /*
562  * Task handle manipulation.
563  */
564 
565 static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle * pthp)566 reset_port_task_handle(ErtsPortTaskHandle *pthp)
567 {
568     erts_atomic_set_relb(pthp, (erts_aint_t) NULL);
569 }
570 
571 static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle * pthp)572 handle2task(ErtsPortTaskHandle *pthp)
573 {
574     return (ErtsPortTask *) erts_atomic_read_acqb(pthp);
575 }
576 
577 static ERTS_INLINE void
reset_handle(ErtsPortTask * ptp)578 reset_handle(ErtsPortTask *ptp)
579 {
580     if (ptp->u.alive.handle) {
581 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
582 	reset_port_task_handle(ptp->u.alive.handle);
583     }
584 }
585 
586 static ERTS_INLINE void
reset_executed_io_task_handle(Port * prt,ErtsPortTask * ptp)587 reset_executed_io_task_handle(Port *prt, ErtsPortTask *ptp)
588 {
589     if (ptp->u.alive.handle) {
590 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
591 #if ERTS_POLL_USE_SCHEDULER_POLLING
592         if (ptp->u.alive.td.io.is_scheduler_event) {
593             if ((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_CHECK_FD_CLEANUP)) {
594                 erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
595                                                   reset_port_task_handle);
596                 erts_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_CHECK_FD_CLEANUP);
597             } else {
598                 reset_port_task_handle(ptp->u.alive.handle);
599             }
600         } else
601 #endif
602         {
603             /* The port task handle is reset inside task_executed */
604             erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
605                                               reset_port_task_handle);
606         }
607     }
608 }
609 
610 static ERTS_INLINE void
set_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)611 set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
612 {
613     ptp->u.alive.handle = pthp;
614     if (pthp) {
615 	erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
616 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
617     }
618 }
619 
620 static ERTS_INLINE void
set_tmp_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)621 set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
622 {
623     ptp->u.alive.handle = NULL;
624     if (pthp) {
625 	/*
626 	 * IMPORTANT! Task either need to be aborted, or task handle
627 	 * need to be detached before thread progress has been made.
628 	 */
629 	erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
630     }
631 }
632 
633 
634 /*
635  * Busy port queue management
636  */
637 
638 static erts_aint32_t
check_unset_busy_port_q(Port * pp,erts_aint32_t flags,ErtsPortTaskBusyPortQ * bpq)639 check_unset_busy_port_q(Port *pp,
640 			erts_aint32_t flags,
641 			ErtsPortTaskBusyPortQ *bpq)
642 {
643     ErlDrvSizeT qsize, low;
644     int resume_procs = 0;
645 
646     ASSERT(bpq);
647     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
648 
649     erts_port_task_sched_lock(&pp->sched);
650     qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
651     low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
652     if (qsize < low) {
653 	erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
654 			       | ERTS_PTS_FLG_BUSY_PORT_Q);
655 	flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask);
656 	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
657 	    resume_procs = 1;
658     }
659     else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
660 	flags = erts_atomic32_read_band_relb(&pp->sched.flags,
661 						 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
662 	flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
663     }
664     erts_port_task_sched_unlock(&pp->sched);
665     if (resume_procs)
666 	erts_port_resume_procs(pp);
667 
668     return flags;
669 }
670 
671 static ERTS_INLINE void
aborted_proc2port_data(Port * pp,ErlDrvSizeT size)672 aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
673 {
674     ErtsPortTaskBusyPortQ *bpq;
675     erts_aint32_t flags;
676     ErlDrvSizeT qsz;
677 
678     ASSERT(pp->sched.taskq.bpq);
679 
680     if (size == 0)
681 	return;
682 
683     bpq = pp->sched.taskq.bpq;
684 
685     qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
686 						      (erts_aint_t) -size);
687     ASSERT(qsz + size > qsz);
688     flags = erts_atomic32_read_nob(&pp->sched.flags);
689     ASSERT(pp->sched.taskq.bpq);
690     if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
691 		  | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
692 	return;
693     if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low))
694 	erts_atomic32_read_bor_nob(&pp->sched.flags,
695 				       ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
696 }
697 
698 static ERTS_INLINE void
dequeued_proc2port_data(Port * pp,ErlDrvSizeT size)699 dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
700 {
701     ErtsPortTaskBusyPortQ *bpq;
702     erts_aint32_t flags;
703     ErlDrvSizeT qsz;
704 
705     ASSERT(pp->sched.taskq.bpq);
706 
707     if (size == 0)
708 	return;
709 
710     bpq = pp->sched.taskq.bpq;
711 
712     qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
713 						      (erts_aint_t) -size);
714     ASSERT(qsz + size > qsz);
715     flags = erts_atomic32_read_nob(&pp->sched.flags);
716     if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
717 	return;
718     if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low))
719 	check_unset_busy_port_q(pp, flags, bpq);
720 }
721 
722 static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port * pp,ErtsProc2PortSigData * sigdp,erts_aint32_t flags)723 enqueue_proc2port_data(Port *pp,
724 		       ErtsProc2PortSigData *sigdp,
725 		       erts_aint32_t flags)
726 {
727     ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
728     if (sigdp && bpq) {
729 	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
730 	if (size) {
731 	    erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size,
732 							      (erts_aint_t) size);
733 	    ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
734 
735 	    ASSERT(qsz - size < qsz);
736 
737 	    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
738 		flags = erts_atomic32_read_bor_acqb(&pp->sched.flags,
739 							ERTS_PTS_FLG_BUSY_PORT_Q);
740 		flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
741 		qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size);
742 		if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) {
743 		    flags = (erts_atomic32_read_bor_relb(
744 				 &pp->sched.flags,
745 				 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
746 		    flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
747 		}
748 	    }
749 	    ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
750 	}
751     }
752     return flags;
753 }
754 
755 /*
756  * erl_drv_busy_msgq_limits() is called by drivers either reading or
757  * writing the limits.
758  *
759  * A limit of zero is interpreted as a read only request (using a
760  * limit of zero would not be useful). Other values are interpreted
761  * as a write-read request.
762  */
763 
764 void
erl_drv_busy_msgq_limits(ErlDrvPort dport,ErlDrvSizeT * lowp,ErlDrvSizeT * highp)765 erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
766 {
767     Port *pp = erts_drvport2port(dport);
768     ErtsPortTaskBusyPortQ *bpq;
769     int written = 0, resume_procs = 0;
770     ErlDrvSizeT low, high;
771 
772     if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
773 	if (lowp)
774 	    *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
775 	if (highp)
776 	    *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
777 	return;
778     }
779 
780     low = lowp ? *lowp : 0;
781     high = highp ? *highp : 0;
782 
783     erts_port_task_sched_lock(&pp->sched);
784 
785     if (low == ERL_DRV_BUSY_MSGQ_DISABLED
786 	|| high == ERL_DRV_BUSY_MSGQ_DISABLED) {
787 	/* Disable busy msgq feature */
788 	erts_aint32_t flags;
789 	pp->sched.taskq.bpq = NULL;
790 	flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
791 	flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags);
792 	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
793 	    resume_procs = 1;
794     }
795     else {
796 
797 	if (!low)
798 	    low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
799 	else {
800 	    if (bpq->high < low)
801 		bpq->high = low;
802 	    erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
803 	    written = 1;
804 	}
805 
806 	if (!high)
807 	    high = bpq->high;
808 	else {
809 	    if (low > high) {
810 		low = high;
811 		erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
812 	    }
813 	    bpq->high = high;
814 	    written = 1;
815 	}
816 
817 	if (written) {
818 	    ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
819 	    if (size > high)
820 		erts_atomic32_read_bor_relb(&pp->sched.flags,
821 						ERTS_PTS_FLG_BUSY_PORT_Q);
822 	    else if (size < low)
823 		erts_atomic32_read_bor_relb(&pp->sched.flags,
824 						ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
825 	}
826     }
827 
828     erts_port_task_sched_unlock(&pp->sched);
829 
830     if (resume_procs)
831 	erts_port_resume_procs(pp);
832     if (lowp)
833 	*lowp = low;
834     if (highp)
835 	*highp = high;
836 }
837 
838 /*
839  * No-suspend handles.
840  */
841 
842 static void
free_port_task_handle_list(void * vpthlp)843 free_port_task_handle_list(void *vpthlp)
844 {
845     erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
846 }
847 
848 static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList * pthlp)849 schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
850 {
851     erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
852 					    (void *) pthlp,
853 					    &pthlp->u.release,
854 					    sizeof(ErtsPortTaskHandleList));
855 }
856 
857 static ERTS_INLINE void
abort_signal_task(Port * pp,int abort_type,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)858 abort_signal_task(Port *pp,
859                   int abort_type,
860                   ErtsPortTaskType type,
861                   ErtsPortTaskTypeData *tdp,
862                   int bpq_data)
863 {
864 
865     ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
866 
867     if (!bpq_data)
868 	tdp->psig.callback(NULL,
869 			   ERTS_PORT_SFLG_INVALID,
870 			   abort_type,
871 			   &tdp->psig.data);
872     else {
873 	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
874 	tdp->psig.callback(NULL,
875 			   ERTS_PORT_SFLG_INVALID,
876 			   abort_type,
877 			   &tdp->psig.data);
878 	aborted_proc2port_data(pp, size);
879     }
880 }
881 
882 
883 static ERTS_INLINE void
abort_nosuspend_task(Port * pp,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)884 abort_nosuspend_task(Port *pp,
885 		     ErtsPortTaskType type,
886 		     ErtsPortTaskTypeData *tdp,
887 		     int bpq_data)
888 {
889     abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, type, tdp, bpq_data);
890 }
891 
892 static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port * pp)893 get_free_nosuspend_handles(Port *pp)
894 {
895     ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
896 
897     ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
898 
899     nshp = pp->sched.taskq.local.busy.nosuspend;
900 
901     while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
902 	last_nshp = nshp;
903 	nshp = nshp->u.next;
904     }
905 
906     if (!last_nshp)
907 	nshp = NULL;
908     else {
909 	nshp = pp->sched.taskq.local.busy.nosuspend;
910 	pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
911 	last_nshp->u.next = NULL;
912 	if (!pp->sched.taskq.local.busy.nosuspend)
913 	    erts_atomic32_read_band_nob(&pp->sched.flags,
914 					    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
915     }
916     return nshp;
917 }
918 
919 static void
free_nosuspend_handles(ErtsPortTaskHandleList * free_nshp)920 free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
921 {
922     while (free_nshp) {
923 	ErtsPortTaskHandleList *nshp = free_nshp;
924 	free_nshp = free_nshp->u.next;
925 	schedule_port_task_handle_list_free(nshp);
926     }
927 }
928 
929 /*
930  * Port queue operations
931  */
932 
933 static ERTS_INLINE void
enqueue_port(ErtsRunQueue * runq,Port * pp)934 enqueue_port(ErtsRunQueue *runq, Port *pp)
935 {
936     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
937     pp->sched.next = NULL;
938     if (runq->ports.end) {
939 	ASSERT(runq->ports.start);
940 	runq->ports.end->sched.next = pp;
941     }
942     else {
943 	ASSERT(!runq->ports.start);
944 	runq->ports.start = pp;
945     }
946 
947     runq->ports.end = pp;
948     ASSERT(runq->ports.start && runq->ports.end);
949 
950     erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
951 
952     if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
953 	erts_non_empty_runq(runq);
954 }
955 
956 static ERTS_INLINE Port *
pop_port(ErtsRunQueue * runq)957 pop_port(ErtsRunQueue *runq)
958 {
959     Port *pp = runq->ports.start;
960     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
961     if (!pp) {
962 	ASSERT(!runq->ports.end);
963     }
964     else {
965 	runq->ports.start = runq->ports.start->sched.next;
966 	if (!runq->ports.start) {
967 	    ASSERT(runq->ports.end == pp);
968 	    runq->ports.end = NULL;
969 	}
970 	erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
971     }
972 
973     ASSERT(runq->ports.start || !runq->ports.end);
974     ASSERT(runq->ports.end || !runq->ports.start);
975     return pp;
976 }
977 
978 /*
979  * Task queue operations
980  */
981 
982 static ERTS_INLINE int
enqueue_task(Port * pp,ErtsPortTask * ptp,ErtsProc2PortSigData * sigdp,ErtsPortTaskHandleList * ns_pthlp,erts_aint32_t * flagsp)983 enqueue_task(Port *pp,
984 	     ErtsPortTask *ptp,
985 	     ErtsProc2PortSigData *sigdp,
986 	     ErtsPortTaskHandleList *ns_pthlp,
987 	     erts_aint32_t *flagsp)
988 
989 {
990     int res;
991     erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
992     erts_aint32_t flags;
993     ptp->u.alive.next = NULL;
994     if (ns_pthlp)
995 	fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
996     erts_port_task_sched_lock(&pp->sched);
997     flags = erts_atomic32_read_nob(&pp->sched.flags);
998     if (flags & fail_flags)
999 	res = 0;
1000     else {
1001 	if (ns_pthlp) {
1002 	    ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
1003 	    pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
1004 	}
1005 	if (pp->sched.taskq.in.last) {
1006 	    ASSERT(pp->sched.taskq.in.first);
1007 	    ASSERT(!pp->sched.taskq.in.last->u.alive.next);
1008 
1009 	    pp->sched.taskq.in.last->u.alive.next = ptp;
1010 	}
1011 	else {
1012 	    ASSERT(!pp->sched.taskq.in.first);
1013 
1014 	    pp->sched.taskq.in.first = ptp;
1015 	}
1016 	pp->sched.taskq.in.last = ptp;
1017 	flags = enqueue_proc2port_data(pp, sigdp, flags);
1018 	res = 1;
1019     }
1020     erts_port_task_sched_unlock(&pp->sched);
1021     *flagsp = flags;
1022     return res;
1023 }
1024 
1025 static ERTS_INLINE void
prepare_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1026 prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1027 {
1028     erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags);
1029 
1030     if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
1031 	*execqp = pp->sched.taskq.local.first;
1032 	*processing_busy_q_p = 0;
1033     }
1034     else {
1035 	*execqp = pp->sched.taskq.local.busy.first;
1036 	*processing_busy_q_p = 1;
1037     }
1038 
1039     ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1040 
1041     while (1) {
1042 	erts_aint32_t new, exp;
1043 
1044 	new = exp = act;
1045 
1046 	new &= ~ERTS_PTS_FLG_IN_RUNQ;
1047 	new |= ERTS_PTS_FLG_EXEC;
1048 
1049 	act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
1050 
1051 	ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
1052 
1053 	if (exp == act)
1054 	    break;
1055     }
1056 }
1057 
1058 /* finalize_exec() return value != 0 if port should remain active */
1059 static ERTS_INLINE int
finalize_exec(Port * pp,ErtsPortTask ** execq,int processing_busy_q)1060 finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
1061 {
1062     erts_aint32_t act;
1063     unsigned int prof_runnable_ports;
1064 
1065     if (!processing_busy_q)
1066 	pp->sched.taskq.local.first = *execq;
1067     else {
1068 	pp->sched.taskq.local.busy.first = *execq;
1069 	ASSERT(*execq);
1070     }
1071 
1072     ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
1073 
1074     *execq = NULL;
1075 
1076     act = erts_atomic32_read_nob(&pp->sched.flags);
1077     if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1078 	act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
1079 
1080     prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1081     if (prof_runnable_ports)
1082 	erts_port_task_sched_lock(&pp->sched);
1083 
1084     while (1) {
1085 	erts_aint32_t new, exp;
1086 
1087 	new = exp = act;
1088 
1089 	new &= ~ERTS_PTS_FLG_EXEC;
1090 	if (act & ERTS_PTS_FLG_HAVE_TASKS)
1091 	    new |= ERTS_PTS_FLG_IN_RUNQ;
1092 
1093 	act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1094 
1095 	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
1096 	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));
1097 
1098 	if (exp == act)
1099 	    break;
1100     }
1101 
1102     if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1103 	/* trace port scheduling, out */
1104 	if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS))
1105 	    trace_sched_ports(pp, am_out);
1106 	if (prof_runnable_ports) {
1107 	    if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS)))
1108 		profile_runnable_port(pp, am_inactive);
1109 	    erts_port_task_sched_unlock(&pp->sched);
1110 	}
1111     }
1112 
1113     return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
1114 }
1115 
1116 static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1117 select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1118 {
1119     erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags);
1120 
1121     if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1122 	flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
1123 
1124     ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1125 
1126     if (flags & ERTS_PTS_FLG_BUSY_PORT) {
1127 	if (*processing_busy_q_p) {
1128 	    ErtsPortTask *ptp;
1129 
1130 	    ptp = pp->sched.taskq.local.busy.first = *execqp;
1131 	    if (!ptp)
1132 		pp->sched.taskq.local.busy.last = NULL;
1133 	    else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
1134 		no_sig_dep_move_from_busyq(pp);
1135 
1136 	    *execqp = pp->sched.taskq.local.first;
1137 	    *processing_busy_q_p = 0;
1138 
1139 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1140 	}
1141 
1142 	return flags;
1143     }
1144 
1145     /* Not busy */
1146 
1147     if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
1148 	pp->sched.taskq.local.first = *execqp;
1149 	*execqp = pp->sched.taskq.local.busy.first;
1150 	*processing_busy_q_p = 1;
1151 
1152 	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1153     }
1154 
1155     return flags;
1156 }
1157 
1158 /*
1159  * check_task_for_exec() returns a value !0 if the task
1160  * is ok to execute; otherwise 0.
1161  */
1162 static ERTS_INLINE int
check_task_for_exec(Port * pp,erts_aint32_t flags,ErtsPortTask ** execqp,int * processing_busy_q_p,ErtsPortTask * ptp)1163 check_task_for_exec(Port *pp,
1164 		    erts_aint32_t flags,
1165 		    ErtsPortTask **execqp,
1166 		    int *processing_busy_q_p,
1167 		    ErtsPortTask *ptp)
1168 {
1169 
1170     if (!*processing_busy_q_p) {
1171 	/* Processing normal queue */
1172 
1173 	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1174 
1175 	if ((flags & ERTS_PTS_FLG_BUSY_PORT)
1176 	    && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
1177 
1178 	    busy_wait_move_to_busy_queue(pp, ptp);
1179 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1180 
1181 	    return 0;
1182 	}
1183 
1184 	if (pp->sched.taskq.local.busy.last
1185 	    && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
1186 
1187 	    int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
1188 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1189 
1190 	    return res;
1191 	}
1192 
1193     }
1194     else {
1195 	/* Processing busy queue */
1196 
1197 	ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
1198 
1199 	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1200 
1201 	popped_from_busy_queue(pp, ptp, !*execqp);
1202 
1203 	if (!*execqp) {
1204 	    *execqp = pp->sched.taskq.local.first;
1205 	    *processing_busy_q_p = 0;
1206 	}
1207 
1208 	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1209 
1210     }
1211 
1212     return 1;
1213 }
1214 
1215 static ErtsPortTask *
fetch_in_queue(Port * pp,ErtsPortTask ** execqp)1216 fetch_in_queue(Port *pp, ErtsPortTask **execqp)
1217 {
1218     ErtsPortTask *ptp;
1219     ErtsPortTaskHandleList *free_nshp = NULL;
1220 
1221     erts_port_task_sched_lock(&pp->sched);
1222 
1223     ptp = pp->sched.taskq.in.first;
1224     pp->sched.taskq.in.first = NULL;
1225     pp->sched.taskq.in.last = NULL;
1226     if (ptp)
1227 	*execqp = ptp->u.alive.next;
1228     else
1229 	erts_atomic32_read_band_nob(&pp->sched.flags,
1230 					~ERTS_PTS_FLG_HAVE_TASKS);
1231 
1232 
1233     if (pp->sched.taskq.local.busy.nosuspend)
1234 	free_nshp = get_free_nosuspend_handles(pp);
1235 
1236     erts_port_task_sched_unlock(&pp->sched);
1237 
1238     if (free_nshp)
1239 	free_nosuspend_handles(free_nshp);
1240 
1241     return ptp;
1242 }
1243 
1244 static ERTS_INLINE ErtsPortTask *
select_task_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1245 select_task_for_exec(Port *pp,
1246 		     ErtsPortTask **execqp,
1247 		     int *processing_busy_q_p)
1248 {
1249     ErtsPortTask *ptp;
1250     erts_aint32_t flags;
1251 
1252     flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
1253 
1254     while (1) {
1255 	ptp = *execqp;
1256 	if (ptp)
1257 	    *execqp = ptp->u.alive.next;
1258 	else {
1259 	    ptp = fetch_in_queue(pp, execqp);
1260 	    if (!ptp)
1261 		return NULL;
1262 	}
1263 	if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
1264 	    return ptp;
1265     }
1266 }
1267 
1268 /*
1269  * Cut time slice
1270  */
1271 
1272 int
erl_drv_consume_timeslice(ErlDrvPort dprt,int percent)1273 erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
1274 {
1275     Port *pp = erts_drvport2port(dprt);
1276     if (pp == ERTS_INVALID_ERL_DRV_PORT)
1277 	return -1;
1278     if (percent < 1)
1279 	percent = 1;
1280     else if (100 < percent)
1281 	percent = 100;
1282     pp->reds += percent*((CONTEXT_REDS+99)/100);
1283     if (pp->reds < CONTEXT_REDS)
1284 	return 0;
1285     pp->reds = CONTEXT_REDS;
1286     return 1;
1287 }
1288 
1289 void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle * pthp)1290 erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
1291 {
1292     ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
1293     reset_port_task_handle(pthp);
1294 }
1295 
1296 /*
1297  * Abort a scheduled task.
1298  */
1299 
1300 int
erts_port_task_abort(ErtsPortTaskHandle * pthp)1301 erts_port_task_abort(ErtsPortTaskHandle *pthp)
1302 {
1303     int res;
1304     ErtsPortTask *ptp;
1305     ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
1306 
1307     ptp = handle2task(pthp);
1308     if (!ptp)
1309 	res = -1;
1310     else {
1311 	erts_aint32_t old_state;
1312 
1313 #ifdef DEBUG
1314 	ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
1315 	ERTS_THR_READ_MEMORY_BARRIER;
1316 	old_state = erts_atomic32_read_nob(&ptp->state);
1317 	if (old_state == ERTS_PT_STATE_SCHEDULED) {
1318 	    ASSERT(!saved_pthp || saved_pthp == pthp);
1319 	}
1320 #endif
1321 
1322 	old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1323 						  ERTS_PT_STATE_ABORTED,
1324 						  ERTS_PT_STATE_SCHEDULED);
1325 	if (old_state != ERTS_PT_STATE_SCHEDULED)
1326 	    res = - 1; /* Task already aborted, executing, or executed */
1327 	else {
1328 	    reset_port_task_handle(pthp);
1329 
1330 #if ERTS_POLL_USE_SCHEDULER_POLLING
1331             switch (ptp->type) {
1332 	    case ERTS_PORT_TASK_INPUT:
1333 	    case ERTS_PORT_TASK_OUTPUT:
1334                 if (ptp->u.alive.td.io.is_scheduler_event) {
1335                     ASSERT(erts_atomic_read_nob(
1336                                &erts_port_task_outstanding_io_tasks) > 0);
1337                     erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
1338                 }
1339 		break;
1340 	    default:
1341 		break;
1342 	    }
1343 #endif
1344 
1345 	    res = 0;
1346 	}
1347     }
1348 
1349     erts_thr_progress_unmanaged_continue(dhndl);
1350 
1351     return res;
1352 }
1353 
1354 void
erts_port_task_abort_nosuspend_tasks(Port * pp)1355 erts_port_task_abort_nosuspend_tasks(Port *pp)
1356 {
1357     ErtsPortTaskHandleList *abort_list;
1358     ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
1359 
1360     erts_port_task_sched_lock(&pp->sched);
1361     erts_atomic32_read_band_nob(&pp->sched.flags,
1362 				    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
1363     abort_list = pp->sched.taskq.local.busy.nosuspend;
1364     pp->sched.taskq.local.busy.nosuspend = NULL;
1365     erts_port_task_sched_unlock(&pp->sched);
1366 
1367     while (abort_list) {
1368 #ifdef DEBUG
1369 	ErtsPortTaskHandle *saved_pthp;
1370 #endif
1371 	ErtsPortTaskType type;
1372 	ErtsPortTaskTypeData td;
1373 	ErtsPortTaskHandle *pthp;
1374 	ErtsPortTask *ptp;
1375 	ErtsPortTaskHandleList *pthlp;
1376 	erts_aint32_t old_state;
1377 
1378 	pthlp = abort_list;
1379 	abort_list = pthlp->u.next;
1380 
1381 	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1382 	    dhndl = erts_thr_progress_unmanaged_delay();
1383 
1384 	pthp = &pthlp->handle;
1385 	ptp = handle2task(pthp);
1386 	if (!ptp) {
1387 	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1388 		erts_thr_progress_unmanaged_continue(dhndl);
1389 	    schedule_port_task_handle_list_free(pthlp);
1390 	    continue;
1391 	}
1392 
1393 #ifdef DEBUG
1394 	saved_pthp = ptp->u.alive.handle;
1395 	ERTS_THR_READ_MEMORY_BARRIER;
1396 	old_state = erts_atomic32_read_nob(&ptp->state);
1397 	if (old_state == ERTS_PT_STATE_SCHEDULED) {
1398 	    ASSERT(saved_pthp == pthp);
1399 	}
1400 #endif
1401 
1402 	old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1403 						  ERTS_PT_STATE_ABORTED,
1404 						  ERTS_PT_STATE_SCHEDULED);
1405 	if (old_state != ERTS_PT_STATE_SCHEDULED) {
1406 	    /* Task already aborted, executing, or executed */
1407 	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1408 		erts_thr_progress_unmanaged_continue(dhndl);
1409 	    schedule_port_task_handle_list_free(pthlp);
1410 	    continue;
1411 	}
1412 
1413 	reset_port_task_handle(pthp);
1414 
1415 	type = ptp->type;
1416 	td = ptp->u.alive.td;
1417 
1418 	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1419 	    erts_thr_progress_unmanaged_continue(dhndl);
1420 	schedule_port_task_handle_list_free(pthlp);
1421 
1422 	abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
1423     }
1424 }
1425 
1426 /*
1427  * Schedule a task.
1428  */
1429 
1430 int
erts_port_task_schedule(Eterm id,ErtsPortTaskHandle * pthp,ErtsPortTaskType type,...)1431 erts_port_task_schedule(Eterm id,
1432 			ErtsPortTaskHandle *pthp,
1433 			ErtsPortTaskType type,
1434 			...)
1435 {
1436     ErtsProc2PortSigData *sigdp = NULL;
1437     ErtsPortTaskHandleList *ns_pthlp = NULL;
1438     ErtsRunQueue *xrunq;
1439     ErtsThrPrgrDelayHandle dhndl;
1440     ErtsRunQueue *runq;
1441     Port *pp;
1442     ErtsPortTask *ptp = NULL;
1443     erts_aint32_t act, add_flags;
1444     unsigned int prof_runnable_ports;
1445 
1446     ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp));
1447 
1448     ASSERT(is_internal_port(id));
1449 
1450     dhndl = erts_thr_progress_unmanaged_delay();
1451 
1452     pp = erts_port_lookup_raw(id);
1453 
1454     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
1455 	if (pp)
1456 	    erts_port_inc_refc(pp);
1457 	erts_thr_progress_unmanaged_continue(dhndl);
1458     }
1459 
1460     if (type != ERTS_PORT_TASK_PROC_SIG) {
1461         if (!pp)
1462             goto fail;
1463 
1464 	ptp = port_task_alloc();
1465 
1466 	ptp->type = type;
1467 	ptp->u.alive.flags = 0;
1468 
1469 	erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
1470 
1471 	set_handle(ptp, pthp);
1472     }
1473 
1474     switch (type) {
1475     case ERTS_PORT_TASK_INPUT:
1476     case ERTS_PORT_TASK_OUTPUT: {
1477 	va_list argp;
1478 	va_start(argp, type);
1479 	ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
1480 #if ERTS_POLL_USE_SCHEDULER_POLLING
1481         ptp->u.alive.td.io.is_scheduler_event = va_arg(argp, int);
1482 #endif
1483 	va_end(argp);
1484 #if ERTS_POLL_USE_SCHEDULER_POLLING
1485         if (ptp->u.alive.td.io.is_scheduler_event)
1486             erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
1487 #endif
1488 	break;
1489     }
1490     case ERTS_PORT_TASK_PROC_SIG: {
1491 	va_list argp;
1492 	va_start(argp, type);
1493 	sigdp = va_arg(argp, ErtsProc2PortSigData *);
1494 	ptp = p2p_sig_data_to_task(sigdp);
1495 	ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
1496  	ptp->u.alive.flags |= va_arg(argp, int);
1497 	va_end(argp);
1498         if (!pp)
1499             goto fail;
1500 
1501 	if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
1502 	    set_tmp_handle(ptp, pthp);
1503 	else {
1504 	    ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
1505 				  sizeof(ErtsPortTaskHandleList));
1506 	    set_handle(ptp, &ns_pthlp->handle);
1507 	}
1508 	break;
1509     }
1510     default:
1511 	break;
1512     }
1513 
1514     if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
1515 	reset_handle(ptp);
1516 	if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
1517 	    goto abort_nosuspend;
1518 	else
1519 	    goto fail;
1520     }
1521 
1522     add_flags = ERTS_PTS_FLG_HAVE_TASKS;
1523     if (ns_pthlp)
1524 	add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
1525 
1526     prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1527     if (prof_runnable_ports)
1528 	erts_port_task_sched_lock(&pp->sched);
1529 
1530     while (1) {
1531 	erts_aint32_t new, exp;
1532 
1533 	if ((act & add_flags) == add_flags
1534 	    && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1535 	    goto done; /* Done */
1536 
1537 	new = exp = act;
1538 	new |= add_flags;
1539 	if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1540 	    new |= ERTS_PTS_FLG_IN_RUNQ;
1541 
1542 	act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1543 
1544 	if (exp == act) {
1545 	    if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1546 		break; /* Need to enqueue port */
1547 	    goto done; /* Done */
1548 	}
1549 
1550 	if (act & ERTS_PTS_FLG_EXIT)
1551 	    goto done; /* Died after our task insert... */
1552     }
1553 
1554     if (prof_runnable_ports) {
1555 	if (!(act & ERTS_PTS_FLG_EXEC_IMM))
1556 	    profile_runnable_port(pp, am_active);
1557 	erts_port_task_sched_unlock(&pp->sched);
1558 	prof_runnable_ports = 0;
1559     }
1560 
1561     /* Enqueue port on run-queue */
1562 
1563     runq = erts_port_runq(pp);
1564 
1565     xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1566     ERTS_LC_ASSERT(runq != xrunq);
1567     ERTS_LC_VERIFY_RQ(runq, pp);
1568     if (xrunq) {
1569 	/* Emigrate port ... */
1570         erts_set_runq_port(pp, xrunq);
1571 	erts_runq_unlock(runq);
1572 	runq = erts_port_runq(pp);
1573     }
1574 
1575     enqueue_port(runq, pp);
1576 
1577     erts_runq_unlock(runq);
1578 
1579     erts_notify_inc_runq(runq);
1580 
1581 done:
1582 
1583     if (prof_runnable_ports)
1584 	erts_port_task_sched_unlock(&pp->sched);
1585 
1586     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1587 	erts_port_dec_refc(pp);
1588 
1589     return 0;
1590 
1591 abort_nosuspend:
1592 
1593     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1594 	erts_port_dec_refc(pp);
1595 
1596     abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);
1597 
1598     ASSERT(ns_pthlp);
1599     erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1600 
1601     ASSERT(ptp);
1602     port_task_free(ptp);
1603 
1604     return 0;
1605 
1606 fail:
1607 
1608     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1609 	erts_port_dec_refc(pp);
1610 
1611     if (ptp) {
1612         if (ptp->type == ERTS_PORT_TASK_PROC_SIG)
1613             abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT,
1614                               ptp->type, &ptp->u.alive.td, 0);
1615 	port_task_free(ptp);
1616     }
1617 
1618     if (ns_pthlp)
1619 	erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1620 
1621     return -1;
1622 }
1623 
1624 void
erts_port_task_free_port(Port * pp)1625 erts_port_task_free_port(Port *pp)
1626 {
1627     erts_aint32_t flags;
1628     ErtsRunQueue *runq;
1629 
1630     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1631     ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
1632 
1633     runq = erts_port_runq(pp);
1634     erts_port_task_sched_lock(&pp->sched);
1635     flags = erts_atomic32_read_bor_relb(&pp->sched.flags,
1636 					    ERTS_PTS_FLG_EXIT);
1637     erts_port_task_sched_unlock(&pp->sched);
1638     erts_atomic32_read_bset_relb(&pp->state,
1639 				 (ERTS_PORT_SFLG_CONNECTED
1640 				  | ERTS_PORT_SFLG_EXITING
1641 				  | ERTS_PORT_SFLG_CLOSING
1642 				  | ERTS_PORT_SFLG_FREE),
1643 				 ERTS_PORT_SFLG_FREE);
1644 
1645     erts_runq_unlock(runq);
1646 
1647     if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1648 	begin_port_cleanup(pp, NULL, NULL);
1649 }
1650 
1651 /*
1652  * Execute scheduled tasks of a port.
1653  *
1654  * erts_port_task_execute() is called by scheduler threads between
1655  * scheduling of processes. Run-queue lock should be held by caller.
1656  */
1657 
1658 void
erts_port_task_execute(ErtsRunQueue * runq,Port ** curr_port_pp)1659 erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
1660 {
1661     Port *pp;
1662     ErtsPortTask *execq;
1663     int processing_busy_q;
1664     int vreds = 0;
1665     int reds = 0;
1666     erts_aint32_t state;
1667     int active;
1668     Uint64 start_time = 0;
1669     ErtsSchedulerData *esdp = runq->scheduler;
1670 #if ERTS_POLL_USE_SCHEDULER_POLLING
1671     erts_aint_t io_tasks_executed = 0;
1672 #endif
1673     ERTS_MSACC_PUSH_STATE_M();
1674 
1675     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1676 
1677     pp = pop_port(runq);
1678     if (!pp) {
1679 	goto done;
1680     }
1681 
1682     ERTS_LC_VERIFY_RQ(runq, pp);
1683 
1684     erts_runq_unlock(runq);
1685 
1686     *curr_port_pp = pp;
1687 
1688     if (erts_sched_stat.enabled) {
1689 	Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
1690 	int migrated = old && old != esdp->no;
1691 
1692 	erts_spin_lock(&erts_sched_stat.lock);
1693 	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
1694 	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
1695 	if (migrated) {
1696 	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
1697 	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
1698 	}
1699 	erts_spin_unlock(&erts_sched_stat.lock);
1700     }
1701 
1702     prepare_exec(pp, &execq, &processing_busy_q);
1703 
1704     erts_port_lock(pp);
1705 
1706     /* trace port scheduling, in */
1707     if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1708 	trace_sched_ports(pp, am_in);
1709     }
1710 
1711     state = erts_atomic32_read_nob(&pp->state);
1712     pp->reds = ERTS_PORT_REDS_EXECUTE;
1713     ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
1714     goto begin_handle_tasks;
1715 
1716     while (1) {
1717 	erts_aint32_t task_state;
1718 	ErtsPortTask *ptp;
1719 
1720 	ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
1721 	if (!ptp)
1722 	    break;
1723 
1724 	task_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1725 						   ERTS_PT_STATE_EXECUTING,
1726 						   ERTS_PT_STATE_SCHEDULED);
1727 	if (task_state != ERTS_PT_STATE_SCHEDULED) {
1728 	    ASSERT(task_state == ERTS_PT_STATE_ABORTED);
1729 	    goto aborted_port_task;
1730 	}
1731 
1732 	if (erts_system_monitor_long_schedule != 0) {
1733 	    start_time = erts_timestamp_millis();
1734 	}
1735 
1736 	ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1737 	ERTS_CHK_NO_PROC_LOCKS;
1738 	ASSERT(pp->drv_ptr);
1739 
1740 	switch (ptp->type) {
1741 	case ERTS_PORT_TASK_TIMEOUT:
1742 	    reset_handle(ptp);
1743 	    if (!ERTS_PTMR_IS_TIMED_OUT(pp))
1744 		reds = 0;
1745 	    else {
1746 		ERTS_PTMR_CLEAR(pp);
1747 		reds = ERTS_PORT_REDS_TIMEOUT;
1748 		if (!(state & ERTS_PORT_SFLGS_DEAD)) {
1749 		    DTRACE_DRIVER(driver_timeout, pp);
1750 		    LTTNG_DRIVER(driver_timeout, pp);
1751                     if (IS_TRACED_FL(pp, F_TRACE_RECEIVE))
1752                         trace_port(pp, am_receive, am_timeout);
1753 		    (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
1754 		}
1755 	    }
1756 	    break;
1757 	case ERTS_PORT_TASK_INPUT:
1758 	    reds = ERTS_PORT_REDS_INPUT;
1759 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1760             DTRACE_DRIVER(driver_ready_input, pp);
1761             LTTNG_DRIVER(driver_ready_input, pp);
1762 	    /* NOTE some windows drivers use ->ready_input
1763 	       for input and output */
1764 	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
1765 					ptp->u.alive.td.io.event);
1766 	    reset_executed_io_task_handle(pp, ptp);
1767 #if ERTS_POLL_USE_SCHEDULER_POLLING
1768             if (ptp->u.alive.td.io.is_scheduler_event)
1769                 io_tasks_executed++;
1770 #endif
1771 	    break;
1772 	case ERTS_PORT_TASK_OUTPUT:
1773 	    reds = ERTS_PORT_REDS_OUTPUT;
1774 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1775             DTRACE_DRIVER(driver_ready_output, pp);
1776             LTTNG_DRIVER(driver_ready_output, pp);
1777 	    (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
1778 					 ptp->u.alive.td.io.event);
1779 	    reset_executed_io_task_handle(pp, ptp);
1780 #if ERTS_POLL_USE_SCHEDULER_POLLING
1781             if (ptp->u.alive.td.io.is_scheduler_event)
1782                 io_tasks_executed++;
1783 #endif
1784 	    break;
1785 	case ERTS_PORT_TASK_PROC_SIG: {
1786 	    ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
1787 	    reset_handle(ptp);
1788 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1789 	    if (!pp->sched.taskq.bpq)
1790 		reds = ptp->u.alive.td.psig.callback(pp,
1791 						     state,
1792 						     ERTS_PROC2PORT_SIG_EXEC,
1793 						     sigdp);
1794 	    else {
1795 		ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
1796 		reds = ptp->u.alive.td.psig.callback(pp,
1797 						     state,
1798 						     ERTS_PROC2PORT_SIG_EXEC,
1799 						     sigdp);
1800 		dequeued_proc2port_data(pp, size);
1801 	    }
1802 	    break;
1803 	}
1804 	case ERTS_PORT_TASK_DIST_CMD:
1805 	    reset_handle(ptp);
1806 	    reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
1807 	    break;
1808 	default:
1809 	    erts_exit(ERTS_ABORT_EXIT,
1810 		     "Invalid port task type: %d\n",
1811 		     (int) ptp->type);
1812 	    break;
1813 	}
1814 
1815 	reds += erts_port_driver_callback_epilogue(pp, &state);
1816 
1817 	if (start_time != 0) {
1818 	    Sint64 diff = erts_timestamp_millis() - start_time;
1819 	    if (diff > 0 && (Uint) diff >  erts_system_monitor_long_schedule) {
1820 		monitor_long_schedule_port(pp,ptp->type,(Uint) diff);
1821 	    }
1822 	}
1823 	start_time = 0;
1824 
1825     aborted_port_task:
1826 	schedule_port_task_free(ptp);
1827 
1828     begin_handle_tasks:
1829 	if (state & ERTS_PORT_SFLG_FREE) {
1830 	    reds += ERTS_PORT_REDS_FREE;
1831 	    begin_port_cleanup(pp, &execq, &processing_busy_q);
1832 
1833 	    break;
1834 	}
1835 
1836 	vreds += ERTS_PORT_CALLBACK_VREDS;
1837 	reds += ERTS_PORT_CALLBACK_VREDS;
1838 
1839 	pp->reds += reds;
1840 	reds = 0;
1841 
1842 	if (pp->reds >= CONTEXT_REDS)
1843 	    break;
1844     }
1845 
1846     ERTS_MSACC_POP_STATE_M();
1847 
1848 #if ERTS_POLL_USE_SCHEDULER_POLLING
1849     if (io_tasks_executed) {
1850         ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
1851 	       >= io_tasks_executed);
1852         erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
1853 				 -1*io_tasks_executed);
1854     }
1855 #endif
1856 
1857     ASSERT(runq == erts_get_runq_port(pp));
1858 
1859     active = finalize_exec(pp, &execq, processing_busy_q);
1860 
1861     reds = pp->reds - vreds;
1862 
1863     erts_port_release(pp);
1864 
1865     *curr_port_pp = NULL;
1866 
1867     erts_runq_lock(runq);
1868 
1869     if (active) {
1870 	ErtsRunQueue *xrunq;
1871 
1872         ASSERT(!(erts_atomic32_read_nob(&pp->state)
1873                  & ERTS_PORT_SFLG_INITIALIZING));
1874 
1875 	xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1876 	ERTS_LC_ASSERT(runq != xrunq);
1877 	ERTS_LC_VERIFY_RQ(runq, pp);
1878 	if (!xrunq) {
1879 	    enqueue_port(runq, pp);
1880 	    /* No need to notify ourselves about inc in runq. */
1881 	}
1882 	else {
1883 	    /* Emigrate port... */
1884             erts_set_runq_port(pp, xrunq);
1885 	    erts_runq_unlock(runq);
1886 
1887 	    xrunq = erts_port_runq(pp);
1888 	    enqueue_port(xrunq, pp);
1889 	    erts_runq_unlock(xrunq);
1890 	    erts_notify_inc_runq(xrunq);
1891 
1892 	    erts_runq_lock(runq);
1893 	}
1894     }
1895 
1896  done:
1897 
1898     runq->scheduler->reductions += reds;
1899 
1900     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1901     ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
1902 }
1903 
1904 static void
release_port(void * vport)1905 release_port(void *vport)
1906 {
1907     erts_port_dec_refc((Port *) vport);
1908 }
1909 
1910 static void
schedule_release_port(void * vport)1911 schedule_release_port(void *vport) {
1912   Port *pp = (Port*)vport;
1913   /* This is only used when a port release was ordered from a non-scheduler */
1914   erts_schedule_thr_prgr_later_op(release_port,
1915 				  (void *) pp,
1916 				  &pp->common.u.release);
1917 }
1918 
1919 
1920 static void
begin_port_cleanup(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1921 begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1922 {
1923     int i, max;
1924     ErtsPortTaskBusyCallerTable *tabp;
1925     ErtsPortTask *qs[3];
1926     ErtsPortTaskHandleList *free_nshp = NULL;
1927     ErtsProcList *plp;
1928 
1929     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1930 
1931     /*
1932      * Abort remaining tasks...
1933      *
1934      * We want to process queues in the following order in order
1935      * to preserve signal ordering guarantees:
1936      *  1. Local busy queue
1937      *  2. Local queue
1938      *  3. In queue
1939      */
1940 
1941     max = 0;
1942     if (!execqp) {
1943 	if (pp->sched.taskq.local.busy.first)
1944 	    qs[max++] = pp->sched.taskq.local.busy.first;
1945 	if (pp->sched.taskq.local.first)
1946 	    qs[max++] = pp->sched.taskq.local.first;
1947     }
1948     else {
1949 	if (*processing_busy_q_p) {
1950 	    if (*execqp)
1951 		qs[max++] = *execqp;
1952 	    if (pp->sched.taskq.local.first)
1953 		qs[max++] = pp->sched.taskq.local.first;
1954 	}
1955 	else {
1956 	    if (pp->sched.taskq.local.busy.first)
1957 		qs[max++] = pp->sched.taskq.local.busy.first;
1958 	    if (*execqp)
1959 		qs[max++] = *execqp;
1960 	}
1961 	*execqp = NULL;
1962 	*processing_busy_q_p = 0;
1963     }
1964     pp->sched.taskq.local.busy.first = NULL;
1965     pp->sched.taskq.local.busy.last = NULL;
1966     pp->sched.taskq.local.first = NULL;
1967     tabp = pp->sched.taskq.local.busy.table;
1968     if (tabp) {
1969 	int bix;
1970 	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
1971 	    ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
1972 	    while (bcp) {
1973 		ErtsPortTaskBusyCaller *free_bcp = bcp;
1974 		bcp = bcp->next;
1975 		if (free_bcp != &tabp->pre_alloc_busy_caller)
1976 		    erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
1977 	    }
1978 	}
1979 
1980 	busy_caller_table_free(tabp);
1981 	pp->sched.taskq.local.busy.table = NULL;
1982     }
1983 
1984     erts_port_task_sched_lock(&pp->sched);
1985     qs[max] = pp->sched.taskq.in.first;
1986     pp->sched.taskq.in.first = NULL;
1987     pp->sched.taskq.in.last = NULL;
1988     erts_port_task_sched_unlock(&pp->sched);
1989     if (qs[max])
1990 	max++;
1991 
1992     for (i = 0; i < max; i++) {
1993 	while (1) {
1994 	    erts_aint32_t state;
1995 	    ErtsPortTask *ptp = qs[i];
1996 	    if (!ptp)
1997 		break;
1998 
1999 	    qs[i] = ptp->u.alive.next;
2000 
2001 	    /* Normal case here is aborted tasks... */
2002 	    state = erts_atomic32_read_nob(&ptp->state);
2003 	    if (state == ERTS_PT_STATE_ABORTED)
2004 		goto aborted_port_task;
2005 
2006 	    state = erts_atomic32_cmpxchg_nob(&ptp->state,
2007 						  ERTS_PT_STATE_EXECUTING,
2008 						  ERTS_PT_STATE_SCHEDULED);
2009 	    if (state != ERTS_PT_STATE_SCHEDULED) {
2010 		ASSERT(state == ERTS_PT_STATE_ABORTED);
2011 		goto aborted_port_task;
2012 	    }
2013 
2014 	    reset_handle(ptp);
2015 
2016 	    switch (ptp->type) {
2017 	    case ERTS_PORT_TASK_TIMEOUT:
2018 		break;
2019 	    case ERTS_PORT_TASK_INPUT:
2020 		erts_stale_drv_select(pp->common.id,
2021 				      ERTS_Port2ErlDrvPort(pp),
2022 				      ptp->u.alive.td.io.event,
2023 				      DO_READ,
2024 				      1);
2025 		break;
2026 	    case ERTS_PORT_TASK_OUTPUT:
2027 		erts_stale_drv_select(pp->common.id,
2028 				      ERTS_Port2ErlDrvPort(pp),
2029 				      ptp->u.alive.td.io.event,
2030 				      DO_WRITE,
2031 				      1);
2032 		break;
2033 	    case ERTS_PORT_TASK_DIST_CMD:
2034 		break;
2035 	    case ERTS_PORT_TASK_PROC_SIG: {
2036 		ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
2037 		if (!pp->sched.taskq.bpq)
2038 		    ptp->u.alive.td.psig.callback(NULL,
2039 						  ERTS_PORT_SFLG_INVALID,
2040 						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2041 						  sigdp);
2042 		else {
2043 		    ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
2044 		    ptp->u.alive.td.psig.callback(NULL,
2045 						  ERTS_PORT_SFLG_INVALID,
2046 						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2047 						  sigdp);
2048 		    aborted_proc2port_data(pp, size);
2049 		}
2050 		break;
2051 	    }
2052 	    default:
2053 		erts_exit(ERTS_ABORT_EXIT,
2054 			 "Invalid port task type: %d\n",
2055 			 (int) ptp->type);
2056 	    }
2057 
2058 	aborted_port_task:
2059 	    schedule_port_task_free(ptp);
2060 	}
2061     }
2062 
2063     erts_atomic32_read_band_nob(&pp->sched.flags,
2064 				    ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
2065 				      |ERTS_PTS_FLG_HAVE_TASKS
2066 				      |ERTS_PTS_FLGS_BUSY));
2067 
2068     erts_port_task_sched_lock(&pp->sched);
2069 
2070     /* Cleanup nosuspend handles... */
2071     free_nshp = (pp->sched.taskq.local.busy.nosuspend
2072 		 ? get_free_nosuspend_handles(pp)
2073 		 : NULL);
2074     ASSERT(!pp->sched.taskq.local.busy.nosuspend);
2075 
2076     /* Make sure not to leave any processes suspended on the port... */
2077     plp = pp->suspended;
2078     pp->suspended = NULL;
2079 
2080     erts_port_task_sched_unlock(&pp->sched);
2081 
2082     if (free_nshp)
2083 	free_nosuspend_handles(free_nshp);
2084 
2085     if (erts_proclist_fetch(&plp, NULL)) {
2086 #ifdef USE_VM_PROBES
2087 	if (DTRACE_ENABLED(process_port_unblocked)) {
2088 	    DTRACE_CHARBUF(port_str, 16);
2089 	    DTRACE_CHARBUF(pid_str, 16);
2090 	    ErtsProcList* plp2 = plp;
2091 
2092 	    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
2093 	    while (plp2 != NULL) {
2094 		erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
2095 		DTRACE2(process_port_unblocked, pid_str, port_str);
2096 	    }
2097 	}
2098 #endif
2099 	erts_resume_processes(plp);
2100     }
2101 
2102     /*
2103      * Schedule cleanup of port structure...
2104      */
2105     /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
2106     if (!erts_get_scheduler_data()) {
2107       erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
2108     } else {
2109       /* Has to be more or less immediate to release any driver */
2110       erts_schedule_thr_prgr_later_op(release_port,
2111 				      (void *) pp,
2112 				      &pp->common.u.release);
2113     }
2114 }
2115 
2116 
2117 void
erts_enqueue_port(ErtsRunQueue * rq,Port * pp)2118 erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
2119 {
2120     ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2121     ASSERT(rq == erts_get_runq_port(pp));
2122     ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
2123     enqueue_port(rq, pp);
2124 }
2125 
2126 Port *
erts_dequeue_port(ErtsRunQueue * rq)2127 erts_dequeue_port(ErtsRunQueue *rq)
2128 {
2129     Port *pp;
2130     ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2131     pp = pop_port(rq);
2132     ASSERT(!pp || rq == erts_get_runq_port(pp));
2133     ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags)
2134 		   & ERTS_PTS_FLG_IN_RUNQ));
2135     return pp;
2136 }
2137 
2138 
2139 /*
2140  * Initialize the module.
2141  */
2142 void
erts_port_task_init(void)2143 erts_port_task_init(void)
2144 {
2145 #if ERTS_POLL_USE_SCHEDULER_POLLING
2146     erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
2147                          (erts_aint_t) 0);
2148 #endif
2149     init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads
2150                          + 1); /* aux_thread */
2151     init_busy_caller_table_alloc();
2152 }
2153