1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2006-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description:	Scheduling of port tasks
23  *
24  * Author: 	Rickard Green
25  */
26 
27 #define ERL_PORT_TASK_C__
28 
29 #ifdef HAVE_CONFIG_H
30 #  include "config.h"
31 #endif
32 
33 #include "global.h"
34 #include "erl_port_task.h"
35 #include "dist.h"
36 #include "erl_check_io.h"
37 #include "dtrace-wrapper.h"
38 #include "lttng-wrapper.h"
39 #include "erl_check_io.h"
40 #include <stdarg.h>
41 
42 /*
43  * ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do...
44  */
45 #define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20)
46 
47 #if defined(DEBUG) && 0
48 #define ERTS_HARD_DEBUG_TASK_QUEUES
49 #else
50 #undef ERTS_HARD_DEBUG_TASK_QUEUES
51 #endif
52 
53 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
54 static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
55 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
56     chk_task_queues((PP), (EQ), (PBQ))
57 #else
58 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
59 #endif
60 
61 #ifdef USE_VM_PROBES
62 #define DTRACE_DRIVER(PROBE_NAME, PP)                              \
63     if (DTRACE_ENABLED(PROBE_NAME)) {                              \
64         DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE);         \
65         DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE);            \
66                                                                    \
67         dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str);  \
68         dtrace_port_str(PP, port_str);                             \
69         DTRACE3(PROBE_NAME, process_str, port_str, PP->name);      \
70     }
71 #else
72 #define  DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
73 #endif
74 #ifdef USE_LTTNG_VM_TRACEPOINTS
75 #define LTTNG_DRIVER(TRACEPOINT, PP)                              \
76     if (LTTNG_ENABLED(TRACEPOINT)) {                              \
77         lttng_decl_portbuf(port_str);                             \
78         lttng_decl_procbuf(proc_str);                             \
79         lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str);  \
80         lttng_port_to_str((PP), port_str);                        \
81         LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name);       \
82     }
83 #else
84 #define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
85 #endif
86 
87 #define ERTS_LC_VERIFY_RQ(RQ, PP)				\
88     do {							\
89 	ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));		\
90 	ERTS_LC_ASSERT((RQ) == erts_get_runq_port((PP)));       \
91     } while (0)
92 
93 #define ERTS_PT_STATE_SCHEDULED		0
94 #define ERTS_PT_STATE_ABORTED		1
95 #define ERTS_PT_STATE_EXECUTING		2
96 
97 typedef union {
98     struct { /* I/O tasks */
99 	ErlDrvEvent event;
100 #if ERTS_POLL_USE_SCHEDULER_POLLING
101         int is_scheduler_event;
102 #endif
103     } io;
104     struct {
105 	ErtsProc2PortSigCallback callback;
106 	ErtsProc2PortSigData data;
107     } psig;
108 } ErtsPortTaskTypeData;
109 
110 struct ErtsPortTask_ {
111     erts_atomic32_t state;
112     ErtsPortTaskType type;
113     union {
114 	struct {
115 	    ErtsPortTask *next;
116 	    ErtsPortTaskHandle *handle;
117 	    int flags;
118 	    Uint32 ref[ERTS_MAX_REF_NUMBERS];
119 	    ErtsPortTaskTypeData td;
120 	} alive;
121 	ErtsThrPrgrLaterOp release;
122     } u;
123 };
124 
125 struct ErtsPortTaskHandleList_ {
126     ErtsPortTaskHandle handle;
127     union {
128 	ErtsPortTaskHandleList *next;
129 	ErtsThrPrgrLaterOp release;
130     } u;
131 };
132 
133 typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
134 struct ErtsPortTaskBusyCaller_ {
135     ErtsPortTaskBusyCaller *next;
136     Eterm caller;
137     SWord count;
138     ErtsPortTask *last;
139 };
140 
141 #define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
142 struct ErtsPortTaskBusyCallerTable_ {
143     ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
144     ErtsPortTaskBusyCaller pre_alloc_busy_caller;
145 };
146 
147 #if ERTS_POLL_USE_SCHEDULER_POLLING
148 erts_atomic_t erts_port_task_outstanding_io_tasks;
149 #endif
150 
151 static void begin_port_cleanup(Port *pp,
152 			       ErtsPortTask **execq,
153 			       int *processing_busy_q_p);
154 
155 ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task,
156                                ErtsPortTask,
157                                1000,
158                                ERTS_ALC_T_PORT_TASK)
159 
160 ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
161 				 ErtsPortTaskBusyCallerTable,
162 				 50,
163 				 ERTS_ALC_T_BUSY_CALLER_TAB)
164 
165 static void
call_port_task_free(void * vptp)166 call_port_task_free(void *vptp)
167 {
168     port_task_free((ErtsPortTask *) vptp);
169 }
170 
171 static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask * ptp)172 schedule_port_task_free(ErtsPortTask *ptp)
173 {
174     erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
175 					    (void *) ptp,
176 					    &ptp->u.release,
177 					    sizeof(ErtsPortTask));
178 }
179 
180 static ERTS_INLINE ErtsPortTask *
p2p_sig_data_to_task(ErtsProc2PortSigData * sigdp)181 p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
182 {
183     ErtsPortTask *ptp;
184     char *ptr = (char *) sigdp;
185     ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
186     ptp = (ErtsPortTask *) ptr;
187     ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
188     return ptp;
189 }
190 
191 static ERTS_INLINE ErtsProc2PortSigData *
p2p_sig_data_init(ErtsPortTask * ptp)192 p2p_sig_data_init(ErtsPortTask *ptp)
193 {
194 
195     ptp->type = ERTS_PORT_TASK_PROC_SIG;
196     ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
197     erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
198 
199     ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
200 
201     return &ptp->u.alive.td.psig.data;
202 }
203 
204 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data(void)205 erts_port_task_alloc_p2p_sig_data(void)
206 {
207     ErtsPortTask *ptp = port_task_alloc();
208 
209     return p2p_sig_data_init(ptp);
210 }
211 
212 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data_extra(size_t extra,void ** extra_ptr)213 erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr)
214 {
215     ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK,
216                                    sizeof(ErtsPortTask) + extra);
217 
218     *extra_ptr = ptp+1;
219 
220     return p2p_sig_data_init(ptp);
221 }
222 
223 void
erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData * sigdp)224 erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp)
225 {
226     schedule_port_task_free(p2p_sig_data_to_task(sigdp));
227 }
228 
229 static ERTS_INLINE Eterm
task_caller(ErtsPortTask * ptp)230 task_caller(ErtsPortTask *ptp)
231 {
232     Eterm caller;
233 
234     ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
235 
236     caller = ptp->u.alive.td.psig.data.caller;
237 
238     ASSERT(is_internal_pid(caller) || is_internal_port(caller));
239 
240     return caller;
241 }
242 
243 /*
244  * Busy queue management
245  */
246 
247 static ERTS_INLINE int
caller2bix(Eterm caller)248 caller2bix(Eterm caller)
249 {
250     ASSERT(is_internal_pid(caller) || is_internal_port(caller));
251     return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
252 }
253 
254 
255 static void
popped_from_busy_queue(Port * pp,ErtsPortTask * ptp,int last)256 popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
257 {
258     ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
259     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
260     Eterm caller = task_caller(ptp);
261     int bix = caller2bix(caller);
262 
263     ASSERT(is_internal_pid(caller));
264 
265     ASSERT(tabp);
266     bcp = tabp->bucket[bix];
267     prev_bcpp = &tabp->bucket[bix];
268     ASSERT(bcp);
269     while (bcp->caller != caller) {
270 	prev_bcpp = &bcp->next;
271 	bcp = bcp->next;
272 	ASSERT(bcp);
273     }
274     ASSERT(bcp->count > 0);
275     if (--bcp->count != 0) {
276 	ASSERT(!last);
277     }
278     else {
279 	*prev_bcpp = bcp->next;
280 	if (bcp == &tabp->pre_alloc_busy_caller)
281 	    bcp->caller = am_undefined;
282 	else
283 	    erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
284 	if (last) {
285 #ifdef DEBUG
286 	    erts_aint32_t flags =
287 #endif
288 		erts_atomic32_read_band_nob(
289 		    &pp->sched.flags,
290 		    ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
291 	    ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
292 #ifdef DEBUG
293 	    for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
294 		ASSERT(!tabp->bucket[bix]);
295 	    }
296 #endif
297 	    busy_caller_table_free(tabp);
298 	    pp->sched.taskq.local.busy.first = NULL;
299 	    pp->sched.taskq.local.busy.last = NULL;
300 	    pp->sched.taskq.local.busy.table = NULL;
301 	}
302     }
303 }
304 
305 static void
busy_wait_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)306 busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
307 {
308     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
309     Eterm caller = task_caller(ptp);
310     ErtsPortTaskBusyCaller *bcp;
311     int bix;
312 
313     ASSERT(is_internal_pid(caller));
314     /*
315      * Port is busy and this task type needs to wait until not busy.
316      */
317 
318     ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
319 
320     ptp->u.alive.next = NULL;
321     if (pp->sched.taskq.local.busy.last) {
322 	ASSERT(pp->sched.taskq.local.busy.first);
323 	pp->sched.taskq.local.busy.last->u.alive.next = ptp;
324     }
325     else {
326 	int i;
327 #ifdef DEBUG
328 	erts_aint32_t flags;
329 #endif
330 	pp->sched.taskq.local.busy.first = ptp;
331 
332 #ifdef DEBUG
333 	flags =
334 #endif
335 	    erts_atomic32_read_bor_nob(&pp->sched.flags,
336 					   ERTS_PTS_FLG_HAVE_BUSY_TASKS);
337 	ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
338 
339 	ASSERT(!tabp);
340 
341 	tabp = busy_caller_table_alloc();
342 	pp->sched.taskq.local.busy.table = tabp;
343 	for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
344 	    tabp->bucket[i] = NULL;
345 	tabp->pre_alloc_busy_caller.caller = am_undefined;
346     }
347     pp->sched.taskq.local.busy.last = ptp;
348 
349     bix = caller2bix(caller);
350     ASSERT(tabp);
351     bcp = tabp->bucket[bix];
352 
353     while (bcp && bcp->caller != caller)
354 	bcp = bcp->next;
355 
356     if (bcp)
357 	bcp->count++;
358     else {
359 	if (tabp->pre_alloc_busy_caller.caller == am_undefined)
360 	    bcp = &tabp->pre_alloc_busy_caller;
361 	else
362 	    bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
363 			     sizeof(ErtsPortTaskBusyCaller));
364 	bcp->caller = caller;
365 	bcp->count = 1;
366 	bcp->next = tabp->bucket[bix];
367 	tabp->bucket[bix] = bcp;
368     }
369 
370     bcp->last = ptp;
371 }
372 
373 static ERTS_INLINE int
check_sig_dep_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)374 check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
375 {
376     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
377     ErtsPortTask *last_ptp;
378     ErtsPortTaskBusyCaller *bcp;
379     int bix;
380     Eterm caller;
381 
382     ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
383     ASSERT(pp->sched.taskq.local.busy.last);
384     ASSERT(tabp);
385 
386 
387     /*
388      * We are either not busy, or the task does not imply wait on busy port.
389      * However, due to the signaling order requirements the task might depend
390      * on other tasks in the busy queue.
391      */
392 
393     caller = task_caller(ptp);
394     bix = caller2bix(caller);
395     bcp = tabp->bucket[bix];
396     while (bcp && bcp->caller != caller)
397 	bcp = bcp->next;
398 
399     if (!bcp)
400 	return 0;
401 
402     /*
403      * There are other tasks that we depend on in the busy queue;
404      * move into busy queue.
405      */
406 
407     bcp->count++;
408     last_ptp = bcp->last;
409     ptp->u.alive.next = last_ptp->u.alive.next;
410     if (!ptp->u.alive.next) {
411 	ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
412 	pp->sched.taskq.local.busy.last = ptp;
413     }
414     last_ptp->u.alive.next = ptp;
415     bcp->last = ptp;
416 
417     return 1;
418 }
419 
420 static void
no_sig_dep_move_from_busyq(Port * pp)421 no_sig_dep_move_from_busyq(Port *pp)
422 {
423     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
424     ErtsPortTask *first_ptp, *last_ptp, *ptp;
425     ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
426 
427     /*
428      * Move tasks at the head of the busy queue that no longer
429      * have any dependencies to busy wait tasks into the ordinary
430      * queue.
431      */
432 
433     first_ptp = ptp = pp->sched.taskq.local.busy.first;
434 
435     ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
436     ASSERT(tabp);
437 
438     do {
439 	Eterm caller = task_caller(ptp);
440 
441 	if (!bcp || bcp->caller != caller) {
442 	    int bix = caller2bix(caller);
443 
444 	    prev_bcpp = &tabp->bucket[bix];
445 	    bcp = tabp->bucket[bix];
446 	    ASSERT(bcp);
447 	    while (bcp->caller != caller) {
448 		ASSERT(bcp);
449 		prev_bcpp = &bcp->next;
450 		bcp = bcp->next;
451 	    }
452 	}
453 
454 	ASSERT(bcp->caller == caller);
455 	ASSERT(bcp->count > 0);
456 
457 	if (--bcp->count == 0) {
458 	    *prev_bcpp = bcp->next;
459 	    if (bcp == &tabp->pre_alloc_busy_caller)
460 		bcp->caller = am_undefined;
461 	    else
462 		erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
463 	}
464 
465 	last_ptp = ptp;
466 	ptp = ptp->u.alive.next;
467     } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
468 
469     pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
470     if (!pp->sched.taskq.local.busy.first) {
471 #ifdef DEBUG
472 	int bix;
473 	erts_aint32_t flags =
474 #endif
475 	    erts_atomic32_read_band_nob(
476 		&pp->sched.flags,
477 		~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
478 	ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
479 #ifdef DEBUG
480 	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
481 	    ASSERT(!tabp->bucket[bix]);
482 	}
483 #endif
484 	busy_caller_table_free(tabp);
485 	pp->sched.taskq.local.busy.last = NULL;
486 	pp->sched.taskq.local.busy.table = NULL;
487     }
488     last_ptp->u.alive.next = pp->sched.taskq.local.first;
489     pp->sched.taskq.local.first = first_ptp;
490 }
491 
492 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
493 
494 static void
chk_task_queues(Port * pp,ErtsPortTask * execq,int processing_busy_queue)495 chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
496 {
497     Sint tot_count, tot_table_count;
498     int bix;
499     ErtsPortTask *ptp, *last;
500     ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
501     ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
502     ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
503     ErtsPortTaskBusyCaller *bcp;
504 
505     if (!first) {
506 	ASSERT(!tabp);
507 	ASSERT(!pp->sched.taskq.local.busy.last);
508 	ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
509 	return;
510     }
511 
512     ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
513     ASSERT(tabp);
514 
515     tot_count = 0;
516     ptp = first;
517     while (ptp) {
518 	Sint count = 0;
519 	Eterm caller = task_caller(ptp);
520 	int bix = caller2bix(caller);
521 	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
522 	    if (bcp->caller == caller)
523 		break;
524 	ASSERT(bcp && bcp->caller == caller);
525 
526 	ASSERT(bcp->last);
527 	while (1) {
528 	    ErtsPortTask *ptp2;
529 
530 	    ASSERT(caller == task_caller(ptp));
531 	    count++;
532 	    tot_count++;
533 	    last = ptp;
534 
535 	    for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
536 		ASSERT(ptp != ptp2);
537 	    }
538 
539 	    if (ptp == bcp->last)
540 		break;
541 	    ptp = ptp->u.alive.next;
542 	}
543 
544 	ASSERT(count == bcp->count);
545 	ptp = ptp->u.alive.next;
546     }
547 
548     tot_table_count = 0;
549     for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
550 	for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
551 	    tot_table_count += bcp->count;
552     }
553 
554     ASSERT(tot_count == tot_table_count);
555 
556     ASSERT(last == pp->sched.taskq.local.busy.last);
557 }
558 
559 #endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
560 
561 /*
562  * Task handle manipulation.
563  */
564 
565 static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle * pthp)566 reset_port_task_handle(ErtsPortTaskHandle *pthp)
567 {
568     erts_atomic_set_relb(pthp, (erts_aint_t) NULL);
569 }
570 
571 static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle * pthp)572 handle2task(ErtsPortTaskHandle *pthp)
573 {
574     return (ErtsPortTask *) erts_atomic_read_acqb(pthp);
575 }
576 
577 static ERTS_INLINE void
reset_handle(ErtsPortTask * ptp)578 reset_handle(ErtsPortTask *ptp)
579 {
580     if (ptp->u.alive.handle) {
581 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
582 	reset_port_task_handle(ptp->u.alive.handle);
583     }
584 }
585 
586 static ERTS_INLINE void
reset_executed_io_task_handle(Port * prt,ErtsPortTask * ptp)587 reset_executed_io_task_handle(Port *prt, ErtsPortTask *ptp)
588 {
589     if (ptp->u.alive.handle) {
590 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
591 #if ERTS_POLL_USE_SCHEDULER_POLLING
592         if (ptp->u.alive.td.io.is_scheduler_event) {
593             if ((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_CHECK_FD_CLEANUP)) {
594                 erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
595                                                   reset_port_task_handle);
596                 erts_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_CHECK_FD_CLEANUP);
597             } else {
598                 reset_port_task_handle(ptp->u.alive.handle);
599             }
600         } else
601 #endif
602         {
603             /* The port task handle is reset inside task_executed */
604             erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
605                                               reset_port_task_handle);
606         }
607     }
608 }
609 
610 static ERTS_INLINE void
set_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)611 set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
612 {
613     ptp->u.alive.handle = pthp;
614     if (pthp) {
615 	erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
616 	ASSERT(ptp == handle2task(ptp->u.alive.handle));
617     }
618 }
619 
620 static ERTS_INLINE void
set_tmp_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)621 set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
622 {
623     ptp->u.alive.handle = NULL;
624     if (pthp) {
625 	/*
626 	 * IMPORTANT! Task either need to be aborted, or task handle
627 	 * need to be detached before thread progress has been made.
628 	 */
629 	erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
630     }
631 }
632 
633 
634 /*
635  * Busy port queue management
636  */
637 
638 static erts_aint32_t
check_unset_busy_port_q(Port * pp,erts_aint32_t flags,ErtsPortTaskBusyPortQ * bpq)639 check_unset_busy_port_q(Port *pp,
640 			erts_aint32_t flags,
641 			ErtsPortTaskBusyPortQ *bpq)
642 {
643     ErlDrvSizeT qsize, low;
644     int resume_procs = 0;
645 
646     ASSERT(bpq);
647     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
648 
649     erts_port_task_sched_lock(&pp->sched);
650     qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
651     low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
652     if (qsize < low) {
653 	erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
654 			       | ERTS_PTS_FLG_BUSY_PORT_Q);
655 	flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask);
656 	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
657 	    resume_procs = 1;
658     }
659     else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
660 	flags = erts_atomic32_read_band_relb(&pp->sched.flags,
661 						 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
662 	flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
663     }
664     erts_port_task_sched_unlock(&pp->sched);
665     if (resume_procs)
666 	erts_port_resume_procs(pp);
667 
668     return flags;
669 }
670 
671 static ERTS_INLINE void
aborted_proc2port_data(Port * pp,ErlDrvSizeT size)672 aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
673 {
674     ErtsPortTaskBusyPortQ *bpq;
675     erts_aint32_t flags;
676     ErlDrvSizeT qsz;
677 
678     ASSERT(pp->sched.taskq.bpq);
679 
680     if (size == 0)
681 	return;
682 
683     bpq = pp->sched.taskq.bpq;
684 
685     qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
686 						      (erts_aint_t) -size);
687     ASSERT(qsz + size > qsz);
688     flags = erts_atomic32_read_nob(&pp->sched.flags);
689     ASSERT(pp->sched.taskq.bpq);
690     if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
691 		  | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
692 	return;
693     if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low))
694 	erts_atomic32_read_bor_nob(&pp->sched.flags,
695 				       ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
696 }
697 
698 static ERTS_INLINE void
dequeued_proc2port_data(Port * pp,ErlDrvSizeT size)699 dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
700 {
701     ErtsPortTaskBusyPortQ *bpq;
702     erts_aint32_t flags;
703     ErlDrvSizeT qsz;
704 
705     ASSERT(pp->sched.taskq.bpq);
706 
707     if (size == 0)
708 	return;
709 
710     bpq = pp->sched.taskq.bpq;
711 
712     qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
713 						      (erts_aint_t) -size);
714     ASSERT(qsz + size > qsz);
715     flags = erts_atomic32_read_nob(&pp->sched.flags);
716     if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
717 	return;
718     if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low))
719 	check_unset_busy_port_q(pp, flags, bpq);
720 }
721 
722 static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port * pp,ErtsProc2PortSigData * sigdp,erts_aint32_t flags)723 enqueue_proc2port_data(Port *pp,
724 		       ErtsProc2PortSigData *sigdp,
725 		       erts_aint32_t flags)
726 {
727     ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
728     if (sigdp && bpq) {
729 	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
730 	if (size) {
731 	    erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size,
732 							      (erts_aint_t) size);
733 	    ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
734 
735 	    ASSERT(qsz - size < qsz);
736 
737 	    if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
738 		flags = erts_atomic32_read_bor_acqb(&pp->sched.flags,
739 							ERTS_PTS_FLG_BUSY_PORT_Q);
740 		flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
741 		qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size);
742 		if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) {
743 		    flags = (erts_atomic32_read_bor_relb(
744 				 &pp->sched.flags,
745 				 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
746 		    flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
747 		}
748 	    }
749 	    ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
750 	}
751     }
752     return flags;
753 }
754 
755 /*
756  * erl_drv_busy_msgq_limits() is called by drivers either reading or
757  * writing the limits.
758  *
759  * A limit of zero is interpreted as a read only request (using a
760  * limit of zero would not be useful). Other values are interpreted
761  * as a write-read request.
762  */
763 
764 void
erl_drv_busy_msgq_limits(ErlDrvPort dport,ErlDrvSizeT * lowp,ErlDrvSizeT * highp)765 erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
766 {
767     Port *pp = erts_drvport2port(dport);
768     ErtsPortTaskBusyPortQ *bpq;
769     int written = 0, resume_procs = 0;
770     ErlDrvSizeT low, high;
771 
772     if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
773 	if (lowp)
774 	    *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
775 	if (highp)
776 	    *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
777 	return;
778     }
779 
780     low = lowp ? *lowp : 0;
781     high = highp ? *highp : 0;
782 
783     erts_port_task_sched_lock(&pp->sched);
784 
785     if (low == ERL_DRV_BUSY_MSGQ_DISABLED
786 	|| high == ERL_DRV_BUSY_MSGQ_DISABLED) {
787 	/* Disable busy msgq feature */
788 	erts_aint32_t flags;
789 	pp->sched.taskq.bpq = NULL;
790 	flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
791 	flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags);
792 	if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
793 	    resume_procs = 1;
794     }
795     else {
796 
797 	if (!low)
798 	    low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
799 	else {
800 	    if (bpq->high < low)
801 		bpq->high = low;
802 	    erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
803 	    written = 1;
804 	}
805 
806 	if (!high)
807 	    high = bpq->high;
808 	else {
809 	    if (low > high) {
810 		low = high;
811 		erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
812 	    }
813 	    bpq->high = high;
814 	    written = 1;
815 	}
816 
817 	if (written) {
818 	    ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
819 	    if (size > high)
820 		erts_atomic32_read_bor_relb(&pp->sched.flags,
821 						ERTS_PTS_FLG_BUSY_PORT_Q);
822 	    else if (size < low)
823 		erts_atomic32_read_bor_relb(&pp->sched.flags,
824 						ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
825 	}
826     }
827 
828     erts_port_task_sched_unlock(&pp->sched);
829 
830     if (resume_procs)
831 	erts_port_resume_procs(pp);
832     if (lowp)
833 	*lowp = low;
834     if (highp)
835 	*highp = high;
836 }
837 
838 /*
839  * No-suspend handles.
840  */
841 
842 static void
free_port_task_handle_list(void * vpthlp)843 free_port_task_handle_list(void *vpthlp)
844 {
845     erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
846 }
847 
848 static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList * pthlp)849 schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
850 {
851     erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
852 					    (void *) pthlp,
853 					    &pthlp->u.release,
854 					    sizeof(ErtsPortTaskHandleList));
855 }
856 
857 static ERTS_INLINE void
abort_signal_task(Port * pp,int abort_type,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)858 abort_signal_task(Port *pp,
859                   int abort_type,
860                   ErtsPortTaskType type,
861                   ErtsPortTaskTypeData *tdp,
862                   int bpq_data)
863 {
864 
865     ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
866 
867     if (!bpq_data)
868 	tdp->psig.callback(NULL,
869 			   ERTS_PORT_SFLG_INVALID,
870 			   abort_type,
871 			   &tdp->psig.data);
872     else {
873 	ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
874 	tdp->psig.callback(NULL,
875 			   ERTS_PORT_SFLG_INVALID,
876 			   abort_type,
877 			   &tdp->psig.data);
878 	aborted_proc2port_data(pp, size);
879     }
880 }
881 
882 
883 static ERTS_INLINE void
abort_nosuspend_task(Port * pp,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)884 abort_nosuspend_task(Port *pp,
885 		     ErtsPortTaskType type,
886 		     ErtsPortTaskTypeData *tdp,
887 		     int bpq_data)
888 {
889     abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, type, tdp, bpq_data);
890 }
891 
892 static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port * pp)893 get_free_nosuspend_handles(Port *pp)
894 {
895     ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
896 
897     ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
898 
899     nshp = pp->sched.taskq.local.busy.nosuspend;
900 
901     while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
902 	last_nshp = nshp;
903 	nshp = nshp->u.next;
904     }
905 
906     if (!last_nshp)
907 	nshp = NULL;
908     else {
909 	nshp = pp->sched.taskq.local.busy.nosuspend;
910 	pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
911 	last_nshp->u.next = NULL;
912 	if (!pp->sched.taskq.local.busy.nosuspend)
913 	    erts_atomic32_read_band_nob(&pp->sched.flags,
914 					    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
915     }
916     return nshp;
917 }
918 
919 static void
free_nosuspend_handles(ErtsPortTaskHandleList * free_nshp)920 free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
921 {
922     while (free_nshp) {
923 	ErtsPortTaskHandleList *nshp = free_nshp;
924 	free_nshp = free_nshp->u.next;
925 	schedule_port_task_handle_list_free(nshp);
926     }
927 }
928 
929 /*
930  * Port queue operations
931  */
932 
933 static ERTS_INLINE void
enqueue_port(ErtsRunQueue * runq,Port * pp)934 enqueue_port(ErtsRunQueue *runq, Port *pp)
935 {
936     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
937     pp->sched.next = NULL;
938     if (runq->ports.end) {
939 	ASSERT(runq->ports.start);
940 	runq->ports.end->sched.next = pp;
941     }
942     else {
943 	ASSERT(!runq->ports.start);
944 	runq->ports.start = pp;
945     }
946 
947     runq->ports.end = pp;
948     ASSERT(runq->ports.start && runq->ports.end);
949 
950     erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
951 
952     if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
953 	erts_non_empty_runq(runq);
954 }
955 
956 static ERTS_INLINE Port *
pop_port(ErtsRunQueue * runq)957 pop_port(ErtsRunQueue *runq)
958 {
959     Port *pp = runq->ports.start;
960     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
961     if (!pp) {
962 	ASSERT(!runq->ports.end);
963     }
964     else {
965 	runq->ports.start = runq->ports.start->sched.next;
966 	if (!runq->ports.start) {
967 	    ASSERT(runq->ports.end == pp);
968 	    runq->ports.end = NULL;
969 	}
970 	erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
971     }
972 
973     ASSERT(runq->ports.start || !runq->ports.end);
974     ASSERT(runq->ports.end || !runq->ports.start);
975     return pp;
976 }
977 
978 /*
979  * Task queue operations
980  */
981 
982 static ERTS_INLINE int
enqueue_task(Port * pp,ErtsPortTask * ptp,ErtsProc2PortSigData * sigdp,ErtsPortTaskHandleList * ns_pthlp,erts_aint32_t * flagsp)983 enqueue_task(Port *pp,
984 	     ErtsPortTask *ptp,
985 	     ErtsProc2PortSigData *sigdp,
986 	     ErtsPortTaskHandleList *ns_pthlp,
987 	     erts_aint32_t *flagsp)
988 
989 {
990     int res;
991     erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
992     erts_aint32_t flags;
993     ptp->u.alive.next = NULL;
994     if (ns_pthlp)
995 	fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
996     erts_port_task_sched_lock(&pp->sched);
997     flags = erts_atomic32_read_nob(&pp->sched.flags);
998     if (flags & fail_flags)
999 	res = 0;
1000     else {
1001 	if (ns_pthlp) {
1002 	    ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
1003 	    pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
1004 	}
1005 	if (pp->sched.taskq.in.last) {
1006 	    ASSERT(pp->sched.taskq.in.first);
1007 	    ASSERT(!pp->sched.taskq.in.last->u.alive.next);
1008 
1009 	    pp->sched.taskq.in.last->u.alive.next = ptp;
1010 	}
1011 	else {
1012 	    ASSERT(!pp->sched.taskq.in.first);
1013 
1014 	    pp->sched.taskq.in.first = ptp;
1015 	}
1016 	pp->sched.taskq.in.last = ptp;
1017 	flags = enqueue_proc2port_data(pp, sigdp, flags);
1018 	res = 1;
1019     }
1020     erts_port_task_sched_unlock(&pp->sched);
1021     *flagsp = flags;
1022     return res;
1023 }
1024 
1025 static ERTS_INLINE void
prepare_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1026 prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1027 {
1028     erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags);
1029 
1030     if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
1031 	*execqp = pp->sched.taskq.local.first;
1032 	*processing_busy_q_p = 0;
1033     }
1034     else {
1035 	*execqp = pp->sched.taskq.local.busy.first;
1036 	*processing_busy_q_p = 1;
1037     }
1038 
1039     ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1040 
1041     while (1) {
1042 	erts_aint32_t new, exp;
1043 
1044 	new = exp = act;
1045 
1046 	new &= ~ERTS_PTS_FLG_IN_RUNQ;
1047 	new |= ERTS_PTS_FLG_EXEC;
1048 
1049 	act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
1050 
1051 	ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
1052 
1053 	if (exp == act)
1054 	    break;
1055     }
1056 }
1057 
1058 /* finalize_exec() return value != 0 if port should remain active */
1059 static ERTS_INLINE int
finalize_exec(Port * pp,ErtsPortTask ** execq,int processing_busy_q)1060 finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
1061 {
1062     erts_aint32_t act;
1063     unsigned int prof_runnable_ports;
1064 
1065     if (!processing_busy_q)
1066 	pp->sched.taskq.local.first = *execq;
1067     else {
1068 	pp->sched.taskq.local.busy.first = *execq;
1069 	ASSERT(*execq);
1070     }
1071 
1072     ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
1073 
1074     *execq = NULL;
1075 
1076     act = erts_atomic32_read_nob(&pp->sched.flags);
1077     if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1078 	act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
1079 
1080     prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1081     if (prof_runnable_ports)
1082 	erts_port_task_sched_lock(&pp->sched);
1083 
1084     while (1) {
1085 	erts_aint32_t new, exp;
1086 
1087 	new = exp = act;
1088 
1089 	new &= ~ERTS_PTS_FLG_EXEC;
1090 	if (act & ERTS_PTS_FLG_HAVE_TASKS)
1091 	    new |= ERTS_PTS_FLG_IN_RUNQ;
1092 
1093 	act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1094 
1095 	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
1096 	ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));
1097 
1098 	if (exp == act)
1099 	    break;
1100     }
1101 
1102     if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1103 	/* trace port scheduling, out */
1104 	if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS))
1105 	    trace_sched_ports(pp, am_out);
1106 	if (prof_runnable_ports) {
1107 	    if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS)))
1108 		profile_runnable_port(pp, am_inactive);
1109 	    erts_port_task_sched_unlock(&pp->sched);
1110 	}
1111     }
1112 
1113     return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
1114 }
1115 
1116 static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1117 select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1118 {
1119     erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags);
1120 
1121     if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1122 	flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
1123 
1124     ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1125 
1126     if (flags & ERTS_PTS_FLG_BUSY_PORT) {
1127 	if (*processing_busy_q_p) {
1128 	    ErtsPortTask *ptp;
1129 
1130 	    ptp = pp->sched.taskq.local.busy.first = *execqp;
1131 	    if (!ptp)
1132 		pp->sched.taskq.local.busy.last = NULL;
1133 	    else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
1134 		no_sig_dep_move_from_busyq(pp);
1135 
1136 	    *execqp = pp->sched.taskq.local.first;
1137 	    *processing_busy_q_p = 0;
1138 
1139 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1140 	}
1141 
1142 	return flags;
1143     }
1144 
1145     /* Not busy */
1146 
1147     if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
1148 	pp->sched.taskq.local.first = *execqp;
1149 	*execqp = pp->sched.taskq.local.busy.first;
1150 	*processing_busy_q_p = 1;
1151 
1152 	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1153     }
1154 
1155     return flags;
1156 }
1157 
1158 /*
1159  * check_task_for_exec() returns a value !0 if the task
1160  * is ok to execute; otherwise 0.
1161  */
1162 static ERTS_INLINE int
check_task_for_exec(Port * pp,erts_aint32_t flags,ErtsPortTask ** execqp,int * processing_busy_q_p,ErtsPortTask * ptp)1163 check_task_for_exec(Port *pp,
1164 		    erts_aint32_t flags,
1165 		    ErtsPortTask **execqp,
1166 		    int *processing_busy_q_p,
1167 		    ErtsPortTask *ptp)
1168 {
1169 
1170     if (!*processing_busy_q_p) {
1171 	/* Processing normal queue */
1172 
1173 	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1174 
1175 	if ((flags & ERTS_PTS_FLG_BUSY_PORT)
1176 	    && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
1177 
1178 	    busy_wait_move_to_busy_queue(pp, ptp);
1179 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1180 
1181 	    return 0;
1182 	}
1183 
1184 	if (pp->sched.taskq.local.busy.last
1185 	    && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
1186 
1187 	    int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
1188 	    ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1189 
1190 	    return res;
1191 	}
1192 
1193     }
1194     else {
1195 	/* Processing busy queue */
1196 
1197 	ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
1198 
1199 	ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1200 
1201 	popped_from_busy_queue(pp, ptp, !*execqp);
1202 
1203 	if (!*execqp) {
1204 	    *execqp = pp->sched.taskq.local.first;
1205 	    *processing_busy_q_p = 0;
1206 	}
1207 
1208 	ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1209 
1210     }
1211 
1212     return 1;
1213 }
1214 
1215 static ErtsPortTask *
fetch_in_queue(Port * pp,ErtsPortTask ** execqp)1216 fetch_in_queue(Port *pp, ErtsPortTask **execqp)
1217 {
1218     ErtsPortTask *ptp;
1219     ErtsPortTaskHandleList *free_nshp = NULL;
1220 
1221     erts_port_task_sched_lock(&pp->sched);
1222 
1223     ptp = pp->sched.taskq.in.first;
1224     pp->sched.taskq.in.first = NULL;
1225     pp->sched.taskq.in.last = NULL;
1226     if (ptp)
1227 	*execqp = ptp->u.alive.next;
1228     else
1229 	erts_atomic32_read_band_nob(&pp->sched.flags,
1230 					~ERTS_PTS_FLG_HAVE_TASKS);
1231 
1232 
1233     if (pp->sched.taskq.local.busy.nosuspend)
1234 	free_nshp = get_free_nosuspend_handles(pp);
1235 
1236     erts_port_task_sched_unlock(&pp->sched);
1237 
1238     if (free_nshp)
1239 	free_nosuspend_handles(free_nshp);
1240 
1241     return ptp;
1242 }
1243 
1244 static ERTS_INLINE ErtsPortTask *
select_task_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1245 select_task_for_exec(Port *pp,
1246 		     ErtsPortTask **execqp,
1247 		     int *processing_busy_q_p)
1248 {
1249     ErtsPortTask *ptp;
1250     erts_aint32_t flags;
1251 
1252     flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
1253 
1254     while (1) {
1255 	ptp = *execqp;
1256 	if (ptp)
1257 	    *execqp = ptp->u.alive.next;
1258 	else {
1259 	    ptp = fetch_in_queue(pp, execqp);
1260 	    if (!ptp)
1261 		return NULL;
1262 	}
1263 	if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
1264 	    return ptp;
1265     }
1266 }
1267 
1268 /*
1269  * Cut time slice
1270  */
1271 
1272 int
erl_drv_consume_timeslice(ErlDrvPort dprt,int percent)1273 erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
1274 {
1275     Port *pp = erts_drvport2port(dprt);
1276     if (pp == ERTS_INVALID_ERL_DRV_PORT)
1277 	return -1;
1278     if (percent < 1)
1279 	percent = 1;
1280     else if (100 < percent)
1281 	percent = 100;
1282     pp->reds += percent*((CONTEXT_REDS+99)/100);
1283     if (pp->reds < CONTEXT_REDS)
1284 	return 0;
1285     pp->reds = CONTEXT_REDS;
1286     return 1;
1287 }
1288 
1289 void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle * pthp)1290 erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
1291 {
1292     ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
1293     reset_port_task_handle(pthp);
1294 }
1295 
1296 /*
1297  * Abort a scheduled task.
1298  */
1299 
1300 int
erts_port_task_abort(ErtsPortTaskHandle * pthp)1301 erts_port_task_abort(ErtsPortTaskHandle *pthp)
1302 {
1303     int res;
1304     ErtsPortTask *ptp;
1305     ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
1306 
1307     ptp = handle2task(pthp);
1308     if (!ptp)
1309 	res = -1;
1310     else {
1311 	erts_aint32_t old_state;
1312 
1313 #ifdef DEBUG
1314 	ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
1315 	ERTS_THR_READ_MEMORY_BARRIER;
1316 	old_state = erts_atomic32_read_nob(&ptp->state);
1317 	if (old_state == ERTS_PT_STATE_SCHEDULED) {
1318 	    ASSERT(!saved_pthp || saved_pthp == pthp);
1319 	}
1320 #endif
1321 
1322 	old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1323 						  ERTS_PT_STATE_ABORTED,
1324 						  ERTS_PT_STATE_SCHEDULED);
1325 	if (old_state != ERTS_PT_STATE_SCHEDULED)
1326 	    res = - 1; /* Task already aborted, executing, or executed */
1327 	else {
1328 	    reset_port_task_handle(pthp);
1329 
1330 #if ERTS_POLL_USE_SCHEDULER_POLLING
1331             switch (ptp->type) {
1332 	    case ERTS_PORT_TASK_INPUT:
1333 	    case ERTS_PORT_TASK_OUTPUT:
1334                 if (ptp->u.alive.td.io.is_scheduler_event) {
1335                     ASSERT(erts_atomic_read_nob(
1336                                &erts_port_task_outstanding_io_tasks) > 0);
1337                     erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
1338                 }
1339 		break;
1340 	    default:
1341 		break;
1342 	    }
1343 #endif
1344 
1345 	    res = 0;
1346 	}
1347     }
1348 
1349     erts_thr_progress_unmanaged_continue(dhndl);
1350 
1351     return res;
1352 }
1353 
1354 void
erts_port_task_abort_nosuspend_tasks(Port * pp)1355 erts_port_task_abort_nosuspend_tasks(Port *pp)
1356 {
1357     ErtsPortTaskHandleList *abort_list;
1358     ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
1359 
1360     erts_port_task_sched_lock(&pp->sched);
1361     erts_atomic32_read_band_nob(&pp->sched.flags,
1362 				    ~ERTS_PTS_FLG_HAVE_NS_TASKS);
1363     abort_list = pp->sched.taskq.local.busy.nosuspend;
1364     pp->sched.taskq.local.busy.nosuspend = NULL;
1365     erts_port_task_sched_unlock(&pp->sched);
1366 
1367     while (abort_list) {
1368 #ifdef DEBUG
1369 	ErtsPortTaskHandle *saved_pthp;
1370 #endif
1371 	ErtsPortTaskType type;
1372 	ErtsPortTaskTypeData td;
1373 	ErtsPortTaskHandle *pthp;
1374 	ErtsPortTask *ptp;
1375 	ErtsPortTaskHandleList *pthlp;
1376 	erts_aint32_t old_state;
1377 
1378 	pthlp = abort_list;
1379 	abort_list = pthlp->u.next;
1380 
1381 	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1382 	    dhndl = erts_thr_progress_unmanaged_delay();
1383 
1384 	pthp = &pthlp->handle;
1385 	ptp = handle2task(pthp);
1386 	if (!ptp) {
1387 	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1388 		erts_thr_progress_unmanaged_continue(dhndl);
1389 	    schedule_port_task_handle_list_free(pthlp);
1390 	    continue;
1391 	}
1392 
1393 #ifdef DEBUG
1394 	saved_pthp = ptp->u.alive.handle;
1395 	ERTS_THR_READ_MEMORY_BARRIER;
1396 	old_state = erts_atomic32_read_nob(&ptp->state);
1397 	if (old_state == ERTS_PT_STATE_SCHEDULED) {
1398 	    ASSERT(saved_pthp == pthp);
1399 	}
1400 #endif
1401 
1402 	old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1403 						  ERTS_PT_STATE_ABORTED,
1404 						  ERTS_PT_STATE_SCHEDULED);
1405 	if (old_state != ERTS_PT_STATE_SCHEDULED) {
1406 	    /* Task already aborted, executing, or executed */
1407 	    if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1408 		erts_thr_progress_unmanaged_continue(dhndl);
1409 	    schedule_port_task_handle_list_free(pthlp);
1410 	    continue;
1411 	}
1412 
1413 	reset_port_task_handle(pthp);
1414 
1415 	type = ptp->type;
1416 	td = ptp->u.alive.td;
1417 
1418 	if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1419 	    erts_thr_progress_unmanaged_continue(dhndl);
1420 	schedule_port_task_handle_list_free(pthlp);
1421 
1422 	abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
1423     }
1424 }
1425 
1426 /*
1427  * Schedule a task.
1428  */
1429 
1430 int
erts_port_task_schedule(Eterm id,ErtsPortTaskHandle * pthp,ErtsPortTaskType type,...)1431 erts_port_task_schedule(Eterm id,
1432 			ErtsPortTaskHandle *pthp,
1433 			ErtsPortTaskType type,
1434 			...)
1435 {
1436     ErtsProc2PortSigData *sigdp = NULL;
1437     ErtsPortTaskHandleList *ns_pthlp = NULL;
1438     ErtsRunQueue *xrunq;
1439     ErtsThrPrgrDelayHandle dhndl;
1440     ErtsRunQueue *runq;
1441     Port *pp;
1442     ErtsPortTask *ptp = NULL;
1443     erts_aint32_t act, add_flags;
1444     unsigned int prof_runnable_ports;
1445 
1446     ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp));
1447 
1448     ASSERT(is_internal_port(id));
1449 
1450     dhndl = erts_thr_progress_unmanaged_delay();
1451 
1452     pp = erts_port_lookup_raw(id);
1453 
1454     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
1455 	if (pp)
1456 	    erts_port_inc_refc(pp);
1457 	erts_thr_progress_unmanaged_continue(dhndl);
1458     }
1459 
1460     if (type != ERTS_PORT_TASK_PROC_SIG) {
1461         if (!pp)
1462             goto fail;
1463 
1464 	ptp = port_task_alloc();
1465 
1466 	ptp->type = type;
1467 	ptp->u.alive.flags = 0;
1468 
1469 	erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
1470 
1471 	set_handle(ptp, pthp);
1472     }
1473 
1474     switch (type) {
1475     case ERTS_PORT_TASK_INPUT:
1476     case ERTS_PORT_TASK_OUTPUT: {
1477 	va_list argp;
1478 	va_start(argp, type);
1479 	ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
1480 #if ERTS_POLL_USE_SCHEDULER_POLLING
1481         ptp->u.alive.td.io.is_scheduler_event = va_arg(argp, int);
1482 #endif
1483 	va_end(argp);
1484 #if ERTS_POLL_USE_SCHEDULER_POLLING
1485         if (ptp->u.alive.td.io.is_scheduler_event)
1486             erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
1487 #endif
1488 	break;
1489     }
1490     case ERTS_PORT_TASK_PROC_SIG: {
1491 	va_list argp;
1492 	va_start(argp, type);
1493 	sigdp = va_arg(argp, ErtsProc2PortSigData *);
1494 	ptp = p2p_sig_data_to_task(sigdp);
1495 	ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
1496  	ptp->u.alive.flags |= va_arg(argp, int);
1497 	va_end(argp);
1498         if (!pp)
1499             goto fail;
1500 
1501 	if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
1502 	    set_tmp_handle(ptp, pthp);
1503 	else {
1504 	    ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
1505 				  sizeof(ErtsPortTaskHandleList));
1506 	    set_handle(ptp, &ns_pthlp->handle);
1507 	}
1508 	break;
1509     }
1510     default:
1511 	break;
1512     }
1513 
1514     if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
1515 	reset_handle(ptp);
1516 	if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
1517 	    goto abort_nosuspend;
1518 	else
1519 	    goto fail;
1520     }
1521 
1522     add_flags = ERTS_PTS_FLG_HAVE_TASKS;
1523     if (ns_pthlp)
1524 	add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
1525 
1526     prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1527     if (prof_runnable_ports)
1528 	erts_port_task_sched_lock(&pp->sched);
1529 
1530     while (1) {
1531 	erts_aint32_t new, exp;
1532 
1533 	if ((act & add_flags) == add_flags
1534 	    && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1535 	    goto done; /* Done */
1536 
1537 	new = exp = act;
1538 	new |= add_flags;
1539 	if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1540 	    new |= ERTS_PTS_FLG_IN_RUNQ;
1541 
1542 	act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1543 
1544 	if (exp == act) {
1545 	    if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1546 		break; /* Need to enqueue port */
1547 	    goto done; /* Done */
1548 	}
1549 
1550 	if (act & ERTS_PTS_FLG_EXIT)
1551 	    goto done; /* Died after our task insert... */
1552     }
1553 
1554     if (prof_runnable_ports) {
1555 	if (!(act & ERTS_PTS_FLG_EXEC_IMM))
1556 	    profile_runnable_port(pp, am_active);
1557 	erts_port_task_sched_unlock(&pp->sched);
1558 	prof_runnable_ports = 0;
1559     }
1560 
1561     /* Enqueue port on run-queue */
1562 
1563     runq = erts_port_runq(pp);
1564 
1565     xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1566     ERTS_LC_ASSERT(runq != xrunq);
1567     ERTS_LC_VERIFY_RQ(runq, pp);
1568     if (xrunq) {
1569 	/* Emigrate port ... */
1570         erts_set_runq_port(pp, xrunq);
1571 	erts_runq_unlock(runq);
1572 	runq = erts_port_runq(pp);
1573     }
1574 
1575     enqueue_port(runq, pp);
1576 
1577     erts_runq_unlock(runq);
1578 
1579     erts_notify_inc_runq(runq);
1580 
1581 done:
1582 
1583     if (prof_runnable_ports)
1584 	erts_port_task_sched_unlock(&pp->sched);
1585 
1586     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1587 	erts_port_dec_refc(pp);
1588 
1589     return 0;
1590 
1591 abort_nosuspend:
1592 
1593     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1594 	erts_port_dec_refc(pp);
1595 
1596     abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);
1597 
1598     ASSERT(ns_pthlp);
1599     erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1600 
1601     ASSERT(ptp);
1602     port_task_free(ptp);
1603 
1604     return 0;
1605 
1606 fail:
1607 
1608     if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1609 	erts_port_dec_refc(pp);
1610 
1611     if (ptp) {
1612         if (ptp->type == ERTS_PORT_TASK_PROC_SIG)
1613             abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT,
1614                               ptp->type, &ptp->u.alive.td, 0);
1615 	port_task_free(ptp);
1616     }
1617 
1618     if (ns_pthlp)
1619 	erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1620 
1621     return -1;
1622 }
1623 
1624 void
erts_port_task_free_port(Port * pp)1625 erts_port_task_free_port(Port *pp)
1626 {
1627     erts_aint32_t flags;
1628     ErtsRunQueue *runq;
1629 
1630     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1631     ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
1632 
1633     runq = erts_port_runq(pp);
1634     erts_port_task_sched_lock(&pp->sched);
1635     flags = erts_atomic32_read_bor_relb(&pp->sched.flags,
1636 					    ERTS_PTS_FLG_EXIT);
1637     erts_port_task_sched_unlock(&pp->sched);
1638     erts_atomic32_read_bset_relb(&pp->state,
1639 				 (ERTS_PORT_SFLG_CONNECTED
1640 				  | ERTS_PORT_SFLG_EXITING
1641 				  | ERTS_PORT_SFLG_CLOSING
1642 				  | ERTS_PORT_SFLG_FREE),
1643 				 ERTS_PORT_SFLG_FREE);
1644 
1645     erts_runq_unlock(runq);
1646 
1647     if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1648 	begin_port_cleanup(pp, NULL, NULL);
1649 }
1650 
1651 /*
1652  * Execute scheduled tasks of a port.
1653  *
1654  * erts_port_task_execute() is called by scheduler threads between
1655  * scheduling of processes. Run-queue lock should be held by caller.
1656  */
1657 
1658 void
erts_port_task_execute(ErtsRunQueue * runq,Port ** curr_port_pp)1659 erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
1660 {
1661     Port *pp;
1662     ErtsPortTask *execq;
1663     int processing_busy_q;
1664     int vreds = 0;
1665     int reds = 0;
1666     int fpe_was_unmasked;
1667     erts_aint32_t state;
1668     int active;
1669     Uint64 start_time = 0;
1670     ErtsSchedulerData *esdp = runq->scheduler;
1671 #if ERTS_POLL_USE_SCHEDULER_POLLING
1672     erts_aint_t io_tasks_executed = 0;
1673 #endif
1674     ERTS_MSACC_PUSH_STATE_M();
1675 
1676     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1677 
1678     pp = pop_port(runq);
1679     if (!pp) {
1680 	goto done;
1681     }
1682 
1683     ERTS_LC_VERIFY_RQ(runq, pp);
1684 
1685     erts_runq_unlock(runq);
1686 
1687     *curr_port_pp = pp;
1688 
1689     if (erts_sched_stat.enabled) {
1690 	Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
1691 	int migrated = old && old != esdp->no;
1692 
1693 	erts_spin_lock(&erts_sched_stat.lock);
1694 	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
1695 	erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
1696 	if (migrated) {
1697 	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
1698 	    erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
1699 	}
1700 	erts_spin_unlock(&erts_sched_stat.lock);
1701     }
1702 
1703     prepare_exec(pp, &execq, &processing_busy_q);
1704 
1705     erts_port_lock(pp);
1706 
1707     /* trace port scheduling, in */
1708     if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1709 	trace_sched_ports(pp, am_in);
1710     }
1711 
1712     fpe_was_unmasked = erts_block_fpe();
1713 
1714     state = erts_atomic32_read_nob(&pp->state);
1715     pp->reds = ERTS_PORT_REDS_EXECUTE;
1716     ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
1717     goto begin_handle_tasks;
1718 
1719     while (1) {
1720 	erts_aint32_t task_state;
1721 	ErtsPortTask *ptp;
1722 
1723 	ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
1724 	if (!ptp)
1725 	    break;
1726 
1727 	task_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1728 						   ERTS_PT_STATE_EXECUTING,
1729 						   ERTS_PT_STATE_SCHEDULED);
1730 	if (task_state != ERTS_PT_STATE_SCHEDULED) {
1731 	    ASSERT(task_state == ERTS_PT_STATE_ABORTED);
1732 	    goto aborted_port_task;
1733 	}
1734 
1735 	if (erts_system_monitor_long_schedule != 0) {
1736 	    start_time = erts_timestamp_millis();
1737 	}
1738 
1739 	ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1740 	ERTS_CHK_NO_PROC_LOCKS;
1741 	ASSERT(pp->drv_ptr);
1742 
1743 	switch (ptp->type) {
1744 	case ERTS_PORT_TASK_TIMEOUT:
1745 	    reset_handle(ptp);
1746 	    if (!ERTS_PTMR_IS_TIMED_OUT(pp))
1747 		reds = 0;
1748 	    else {
1749 		ERTS_PTMR_CLEAR(pp);
1750 		reds = ERTS_PORT_REDS_TIMEOUT;
1751 		if (!(state & ERTS_PORT_SFLGS_DEAD)) {
1752 		    DTRACE_DRIVER(driver_timeout, pp);
1753 		    LTTNG_DRIVER(driver_timeout, pp);
1754                     if (IS_TRACED_FL(pp, F_TRACE_RECEIVE))
1755                         trace_port(pp, am_receive, am_timeout);
1756 		    (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
1757 		}
1758 	    }
1759 	    break;
1760 	case ERTS_PORT_TASK_INPUT:
1761 	    reds = ERTS_PORT_REDS_INPUT;
1762 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1763             DTRACE_DRIVER(driver_ready_input, pp);
1764             LTTNG_DRIVER(driver_ready_input, pp);
1765 	    /* NOTE some windows drivers use ->ready_input
1766 	       for input and output */
1767 	    (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
1768 					ptp->u.alive.td.io.event);
1769 	    reset_executed_io_task_handle(pp, ptp);
1770 #if ERTS_POLL_USE_SCHEDULER_POLLING
1771             if (ptp->u.alive.td.io.is_scheduler_event)
1772                 io_tasks_executed++;
1773 #endif
1774 	    break;
1775 	case ERTS_PORT_TASK_OUTPUT:
1776 	    reds = ERTS_PORT_REDS_OUTPUT;
1777 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1778             DTRACE_DRIVER(driver_ready_output, pp);
1779             LTTNG_DRIVER(driver_ready_output, pp);
1780 	    (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
1781 					 ptp->u.alive.td.io.event);
1782 	    reset_executed_io_task_handle(pp, ptp);
1783 #if ERTS_POLL_USE_SCHEDULER_POLLING
1784             if (ptp->u.alive.td.io.is_scheduler_event)
1785                 io_tasks_executed++;
1786 #endif
1787 	    break;
1788 	case ERTS_PORT_TASK_PROC_SIG: {
1789 	    ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
1790 	    reset_handle(ptp);
1791 	    ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1792 	    if (!pp->sched.taskq.bpq)
1793 		reds = ptp->u.alive.td.psig.callback(pp,
1794 						     state,
1795 						     ERTS_PROC2PORT_SIG_EXEC,
1796 						     sigdp);
1797 	    else {
1798 		ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
1799 		reds = ptp->u.alive.td.psig.callback(pp,
1800 						     state,
1801 						     ERTS_PROC2PORT_SIG_EXEC,
1802 						     sigdp);
1803 		dequeued_proc2port_data(pp, size);
1804 	    }
1805 	    break;
1806 	}
1807 	case ERTS_PORT_TASK_DIST_CMD:
1808 	    reset_handle(ptp);
1809 	    reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
1810 	    break;
1811 	default:
1812 	    erts_exit(ERTS_ABORT_EXIT,
1813 		     "Invalid port task type: %d\n",
1814 		     (int) ptp->type);
1815 	    break;
1816 	}
1817 
1818 	reds += erts_port_driver_callback_epilogue(pp, &state);
1819 
1820 	if (start_time != 0) {
1821 	    Sint64 diff = erts_timestamp_millis() - start_time;
1822 	    if (diff > 0 && (Uint) diff >  erts_system_monitor_long_schedule) {
1823 		monitor_long_schedule_port(pp,ptp->type,(Uint) diff);
1824 	    }
1825 	}
1826 	start_time = 0;
1827 
1828     aborted_port_task:
1829 	schedule_port_task_free(ptp);
1830 
1831     begin_handle_tasks:
1832 	if (state & ERTS_PORT_SFLG_FREE) {
1833 	    reds += ERTS_PORT_REDS_FREE;
1834 	    begin_port_cleanup(pp, &execq, &processing_busy_q);
1835 
1836 	    break;
1837 	}
1838 
1839 	vreds += ERTS_PORT_CALLBACK_VREDS;
1840 	reds += ERTS_PORT_CALLBACK_VREDS;
1841 
1842 	pp->reds += reds;
1843 	reds = 0;
1844 
1845 	if (pp->reds >= CONTEXT_REDS)
1846 	    break;
1847     }
1848 
1849     erts_unblock_fpe(fpe_was_unmasked);
1850     ERTS_MSACC_POP_STATE_M();
1851 
1852 #if ERTS_POLL_USE_SCHEDULER_POLLING
1853     if (io_tasks_executed) {
1854         ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
1855 	       >= io_tasks_executed);
1856         erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
1857 				 -1*io_tasks_executed);
1858     }
1859 #endif
1860 
1861     ASSERT(runq == erts_get_runq_port(pp));
1862 
1863     active = finalize_exec(pp, &execq, processing_busy_q);
1864 
1865     reds = pp->reds - vreds;
1866 
1867     erts_port_release(pp);
1868 
1869     *curr_port_pp = NULL;
1870 
1871     erts_runq_lock(runq);
1872 
1873     if (active) {
1874 	ErtsRunQueue *xrunq;
1875 
1876 	ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
1877 
1878 	xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1879 	ERTS_LC_ASSERT(runq != xrunq);
1880 	ERTS_LC_VERIFY_RQ(runq, pp);
1881 	if (!xrunq) {
1882 	    enqueue_port(runq, pp);
1883 	    /* No need to notify ourselves about inc in runq. */
1884 	}
1885 	else {
1886 	    /* Emigrate port... */
1887             erts_set_runq_port(pp, xrunq);
1888 	    erts_runq_unlock(runq);
1889 
1890 	    xrunq = erts_port_runq(pp);
1891 	    enqueue_port(xrunq, pp);
1892 	    erts_runq_unlock(xrunq);
1893 	    erts_notify_inc_runq(xrunq);
1894 
1895 	    erts_runq_lock(runq);
1896 	}
1897     }
1898 
1899  done:
1900 
1901     runq->scheduler->reductions += reds;
1902 
1903     ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1904     ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
1905 }
1906 
1907 static void
release_port(void * vport)1908 release_port(void *vport)
1909 {
1910     erts_port_dec_refc((Port *) vport);
1911 }
1912 
1913 static void
schedule_release_port(void * vport)1914 schedule_release_port(void *vport) {
1915   Port *pp = (Port*)vport;
1916   /* This is only used when a port release was ordered from a non-scheduler */
1917   erts_schedule_thr_prgr_later_op(release_port,
1918 				  (void *) pp,
1919 				  &pp->common.u.release);
1920 }
1921 
1922 
1923 static void
begin_port_cleanup(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1924 begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1925 {
1926     int i, max;
1927     ErtsPortTaskBusyCallerTable *tabp;
1928     ErtsPortTask *qs[3];
1929     ErtsPortTaskHandleList *free_nshp = NULL;
1930     ErtsProcList *plp;
1931 
1932     ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1933 
1934     /*
1935      * Abort remaining tasks...
1936      *
1937      * We want to process queues in the following order in order
1938      * to preserve signal ordering guarantees:
1939      *  1. Local busy queue
1940      *  2. Local queue
1941      *  3. In queue
1942      */
1943 
1944     max = 0;
1945     if (!execqp) {
1946 	if (pp->sched.taskq.local.busy.first)
1947 	    qs[max++] = pp->sched.taskq.local.busy.first;
1948 	if (pp->sched.taskq.local.first)
1949 	    qs[max++] = pp->sched.taskq.local.first;
1950     }
1951     else {
1952 	if (*processing_busy_q_p) {
1953 	    if (*execqp)
1954 		qs[max++] = *execqp;
1955 	    if (pp->sched.taskq.local.first)
1956 		qs[max++] = pp->sched.taskq.local.first;
1957 	}
1958 	else {
1959 	    if (pp->sched.taskq.local.busy.first)
1960 		qs[max++] = pp->sched.taskq.local.busy.first;
1961 	    if (*execqp)
1962 		qs[max++] = *execqp;
1963 	}
1964 	*execqp = NULL;
1965 	*processing_busy_q_p = 0;
1966     }
1967     pp->sched.taskq.local.busy.first = NULL;
1968     pp->sched.taskq.local.busy.last = NULL;
1969     pp->sched.taskq.local.first = NULL;
1970     tabp = pp->sched.taskq.local.busy.table;
1971     if (tabp) {
1972 	int bix;
1973 	for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
1974 	    ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
1975 	    while (bcp) {
1976 		ErtsPortTaskBusyCaller *free_bcp = bcp;
1977 		bcp = bcp->next;
1978 		if (free_bcp != &tabp->pre_alloc_busy_caller)
1979 		    erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
1980 	    }
1981 	}
1982 
1983 	busy_caller_table_free(tabp);
1984 	pp->sched.taskq.local.busy.table = NULL;
1985     }
1986 
1987     erts_port_task_sched_lock(&pp->sched);
1988     qs[max] = pp->sched.taskq.in.first;
1989     pp->sched.taskq.in.first = NULL;
1990     pp->sched.taskq.in.last = NULL;
1991     erts_port_task_sched_unlock(&pp->sched);
1992     if (qs[max])
1993 	max++;
1994 
1995     for (i = 0; i < max; i++) {
1996 	while (1) {
1997 	    erts_aint32_t state;
1998 	    ErtsPortTask *ptp = qs[i];
1999 	    if (!ptp)
2000 		break;
2001 
2002 	    qs[i] = ptp->u.alive.next;
2003 
2004 	    /* Normal case here is aborted tasks... */
2005 	    state = erts_atomic32_read_nob(&ptp->state);
2006 	    if (state == ERTS_PT_STATE_ABORTED)
2007 		goto aborted_port_task;
2008 
2009 	    state = erts_atomic32_cmpxchg_nob(&ptp->state,
2010 						  ERTS_PT_STATE_EXECUTING,
2011 						  ERTS_PT_STATE_SCHEDULED);
2012 	    if (state != ERTS_PT_STATE_SCHEDULED) {
2013 		ASSERT(state == ERTS_PT_STATE_ABORTED);
2014 		goto aborted_port_task;
2015 	    }
2016 
2017 	    reset_handle(ptp);
2018 
2019 	    switch (ptp->type) {
2020 	    case ERTS_PORT_TASK_TIMEOUT:
2021 		break;
2022 	    case ERTS_PORT_TASK_INPUT:
2023 		erts_stale_drv_select(pp->common.id,
2024 				      ERTS_Port2ErlDrvPort(pp),
2025 				      ptp->u.alive.td.io.event,
2026 				      DO_READ,
2027 				      1);
2028 		break;
2029 	    case ERTS_PORT_TASK_OUTPUT:
2030 		erts_stale_drv_select(pp->common.id,
2031 				      ERTS_Port2ErlDrvPort(pp),
2032 				      ptp->u.alive.td.io.event,
2033 				      DO_WRITE,
2034 				      1);
2035 		break;
2036 	    case ERTS_PORT_TASK_DIST_CMD:
2037 		break;
2038 	    case ERTS_PORT_TASK_PROC_SIG: {
2039 		ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
2040 		if (!pp->sched.taskq.bpq)
2041 		    ptp->u.alive.td.psig.callback(NULL,
2042 						  ERTS_PORT_SFLG_INVALID,
2043 						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2044 						  sigdp);
2045 		else {
2046 		    ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
2047 		    ptp->u.alive.td.psig.callback(NULL,
2048 						  ERTS_PORT_SFLG_INVALID,
2049 						  ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2050 						  sigdp);
2051 		    aborted_proc2port_data(pp, size);
2052 		}
2053 		break;
2054 	    }
2055 	    default:
2056 		erts_exit(ERTS_ABORT_EXIT,
2057 			 "Invalid port task type: %d\n",
2058 			 (int) ptp->type);
2059 	    }
2060 
2061 	aborted_port_task:
2062 	    schedule_port_task_free(ptp);
2063 	}
2064     }
2065 
2066     erts_atomic32_read_band_nob(&pp->sched.flags,
2067 				    ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
2068 				      |ERTS_PTS_FLG_HAVE_TASKS
2069 				      |ERTS_PTS_FLGS_BUSY));
2070 
2071     erts_port_task_sched_lock(&pp->sched);
2072 
2073     /* Cleanup nosuspend handles... */
2074     free_nshp = (pp->sched.taskq.local.busy.nosuspend
2075 		 ? get_free_nosuspend_handles(pp)
2076 		 : NULL);
2077     ASSERT(!pp->sched.taskq.local.busy.nosuspend);
2078 
2079     /* Make sure not to leave any processes suspended on the port... */
2080     plp = pp->suspended;
2081     pp->suspended = NULL;
2082 
2083     erts_port_task_sched_unlock(&pp->sched);
2084 
2085     if (free_nshp)
2086 	free_nosuspend_handles(free_nshp);
2087 
2088     if (erts_proclist_fetch(&plp, NULL)) {
2089 #ifdef USE_VM_PROBES
2090 	if (DTRACE_ENABLED(process_port_unblocked)) {
2091 	    DTRACE_CHARBUF(port_str, 16);
2092 	    DTRACE_CHARBUF(pid_str, 16);
2093 	    ErtsProcList* plp2 = plp;
2094 
2095 	    erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
2096 	    while (plp2 != NULL) {
2097 		erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
2098 		DTRACE2(process_port_unblocked, pid_str, port_str);
2099 	    }
2100 	}
2101 #endif
2102 	erts_resume_processes(plp);
2103     }
2104 
2105     /*
2106      * Schedule cleanup of port structure...
2107      */
2108     /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
2109     if (!erts_get_scheduler_data()) {
2110       erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
2111     } else {
2112       /* Has to be more or less immediate to release any driver */
2113       erts_schedule_thr_prgr_later_op(release_port,
2114 				      (void *) pp,
2115 				      &pp->common.u.release);
2116     }
2117 }
2118 
2119 
2120 void
erts_enqueue_port(ErtsRunQueue * rq,Port * pp)2121 erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
2122 {
2123     ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2124     ASSERT(rq == erts_get_runq_port(pp));
2125     ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
2126     enqueue_port(rq, pp);
2127 }
2128 
2129 Port *
erts_dequeue_port(ErtsRunQueue * rq)2130 erts_dequeue_port(ErtsRunQueue *rq)
2131 {
2132     Port *pp;
2133     ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2134     pp = pop_port(rq);
2135     ASSERT(!pp || rq == erts_get_runq_port(pp));
2136     ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags)
2137 		   & ERTS_PTS_FLG_IN_RUNQ));
2138     return pp;
2139 }
2140 
2141 
2142 /*
2143  * Initialize the module.
2144  */
2145 void
erts_port_task_init(void)2146 erts_port_task_init(void)
2147 {
2148 #if ERTS_POLL_USE_SCHEDULER_POLLING
2149     erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
2150                          (erts_aint_t) 0);
2151 #endif
2152     init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads
2153                          + 1); /* aux_thread */
2154     init_busy_caller_table_alloc();
2155 }
2156