1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2006-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Scheduling of port tasks
23 *
24 * Author: Rickard Green
25 */
26
27 #define ERL_PORT_TASK_C__
28
29 #ifdef HAVE_CONFIG_H
30 # include "config.h"
31 #endif
32
33 #include "global.h"
34 #include "erl_port_task.h"
35 #include "dist.h"
36 #include "erl_check_io.h"
37 #include "dtrace-wrapper.h"
38 #include "lttng-wrapper.h"
39 #include "erl_check_io.h"
40 #include <stdarg.h>
41
42 /*
43 * ERTS_PORT_CALLBACK_VREDS: Limit the amount of callback calls we do...
44 */
45 #define ERTS_PORT_CALLBACK_VREDS (CONTEXT_REDS/20)
46
47 #if defined(DEBUG) && 0
48 #define ERTS_HARD_DEBUG_TASK_QUEUES
49 #else
50 #undef ERTS_HARD_DEBUG_TASK_QUEUES
51 #endif
52
53 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
54 static void chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue);
55 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ) \
56 chk_task_queues((PP), (EQ), (PBQ))
57 #else
58 #define ERTS_PT_DBG_CHK_TASK_QS(PP, EQ, PBQ)
59 #endif
60
61 #ifdef USE_VM_PROBES
62 #define DTRACE_DRIVER(PROBE_NAME, PP) \
63 if (DTRACE_ENABLED(PROBE_NAME)) { \
64 DTRACE_CHARBUF(process_str, DTRACE_TERM_BUF_SIZE); \
65 DTRACE_CHARBUF(port_str, DTRACE_TERM_BUF_SIZE); \
66 \
67 dtrace_pid_str(ERTS_PORT_GET_CONNECTED(PP), process_str); \
68 dtrace_port_str(PP, port_str); \
69 DTRACE3(PROBE_NAME, process_str, port_str, PP->name); \
70 }
71 #else
72 #define DTRACE_DRIVER(PROBE_NAME, PP) do {} while(0)
73 #endif
74 #ifdef USE_LTTNG_VM_TRACEPOINTS
75 #define LTTNG_DRIVER(TRACEPOINT, PP) \
76 if (LTTNG_ENABLED(TRACEPOINT)) { \
77 lttng_decl_portbuf(port_str); \
78 lttng_decl_procbuf(proc_str); \
79 lttng_pid_to_str(ERTS_PORT_GET_CONNECTED(PP), proc_str); \
80 lttng_port_to_str((PP), port_str); \
81 LTTNG3(TRACEPOINT, proc_str, port_str, (PP)->name); \
82 }
83 #else
84 #define LTTNG_DRIVER(TRACEPOINT, PP) do {} while(0)
85 #endif
86
87 #define ERTS_LC_VERIFY_RQ(RQ, PP) \
88 do { \
89 ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq)); \
90 ERTS_LC_ASSERT((RQ) == erts_get_runq_port((PP))); \
91 } while (0)
92
93 #define ERTS_PT_STATE_SCHEDULED 0
94 #define ERTS_PT_STATE_ABORTED 1
95 #define ERTS_PT_STATE_EXECUTING 2
96
97 typedef union {
98 struct { /* I/O tasks */
99 ErlDrvEvent event;
100 #if ERTS_POLL_USE_SCHEDULER_POLLING
101 int is_scheduler_event;
102 #endif
103 } io;
104 struct {
105 ErtsProc2PortSigCallback callback;
106 ErtsProc2PortSigData data;
107 } psig;
108 } ErtsPortTaskTypeData;
109
110 struct ErtsPortTask_ {
111 erts_atomic32_t state;
112 ErtsPortTaskType type;
113 union {
114 struct {
115 ErtsPortTask *next;
116 ErtsPortTaskHandle *handle;
117 int flags;
118 Uint32 ref[ERTS_MAX_REF_NUMBERS];
119 ErtsPortTaskTypeData td;
120 } alive;
121 ErtsThrPrgrLaterOp release;
122 } u;
123 };
124
125 struct ErtsPortTaskHandleList_ {
126 ErtsPortTaskHandle handle;
127 union {
128 ErtsPortTaskHandleList *next;
129 ErtsThrPrgrLaterOp release;
130 } u;
131 };
132
133 typedef struct ErtsPortTaskBusyCaller_ ErtsPortTaskBusyCaller;
134 struct ErtsPortTaskBusyCaller_ {
135 ErtsPortTaskBusyCaller *next;
136 Eterm caller;
137 SWord count;
138 ErtsPortTask *last;
139 };
140
141 #define ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS 17
142 struct ErtsPortTaskBusyCallerTable_ {
143 ErtsPortTaskBusyCaller *bucket[ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS];
144 ErtsPortTaskBusyCaller pre_alloc_busy_caller;
145 };
146
147 #if ERTS_POLL_USE_SCHEDULER_POLLING
148 erts_atomic_t erts_port_task_outstanding_io_tasks;
149 #endif
150
151 static void begin_port_cleanup(Port *pp,
152 ErtsPortTask **execq,
153 int *processing_busy_q_p);
154
155 ERTS_THR_PREF_QUICK_ALLOC_IMPL(port_task,
156 ErtsPortTask,
157 1000,
158 ERTS_ALC_T_PORT_TASK)
159
160 ERTS_SCHED_PREF_QUICK_ALLOC_IMPL(busy_caller_table,
161 ErtsPortTaskBusyCallerTable,
162 50,
163 ERTS_ALC_T_BUSY_CALLER_TAB)
164
165 static void
call_port_task_free(void * vptp)166 call_port_task_free(void *vptp)
167 {
168 port_task_free((ErtsPortTask *) vptp);
169 }
170
171 static ERTS_INLINE void
schedule_port_task_free(ErtsPortTask * ptp)172 schedule_port_task_free(ErtsPortTask *ptp)
173 {
174 erts_schedule_thr_prgr_later_cleanup_op(call_port_task_free,
175 (void *) ptp,
176 &ptp->u.release,
177 sizeof(ErtsPortTask));
178 }
179
180 static ERTS_INLINE ErtsPortTask *
p2p_sig_data_to_task(ErtsProc2PortSigData * sigdp)181 p2p_sig_data_to_task(ErtsProc2PortSigData *sigdp)
182 {
183 ErtsPortTask *ptp;
184 char *ptr = (char *) sigdp;
185 ptr -= offsetof(ErtsPortTask, u.alive.td.psig.data);
186 ptp = (ErtsPortTask *) ptr;
187 ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
188 return ptp;
189 }
190
191 static ERTS_INLINE ErtsProc2PortSigData *
p2p_sig_data_init(ErtsPortTask * ptp)192 p2p_sig_data_init(ErtsPortTask *ptp)
193 {
194
195 ptp->type = ERTS_PORT_TASK_PROC_SIG;
196 ptp->u.alive.flags = ERTS_PT_FLG_SIG_DEP;
197 erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
198
199 ASSERT(ptp == p2p_sig_data_to_task(&ptp->u.alive.td.psig.data));
200
201 return &ptp->u.alive.td.psig.data;
202 }
203
204 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data(void)205 erts_port_task_alloc_p2p_sig_data(void)
206 {
207 ErtsPortTask *ptp = port_task_alloc();
208
209 return p2p_sig_data_init(ptp);
210 }
211
212 ErtsProc2PortSigData *
erts_port_task_alloc_p2p_sig_data_extra(size_t extra,void ** extra_ptr)213 erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr)
214 {
215 ErtsPortTask *ptp = erts_alloc(ERTS_ALC_T_PORT_TASK,
216 sizeof(ErtsPortTask) + extra);
217
218 *extra_ptr = ptp+1;
219
220 return p2p_sig_data_init(ptp);
221 }
222
223 void
erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData * sigdp)224 erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp)
225 {
226 schedule_port_task_free(p2p_sig_data_to_task(sigdp));
227 }
228
229 static ERTS_INLINE Eterm
task_caller(ErtsPortTask * ptp)230 task_caller(ErtsPortTask *ptp)
231 {
232 Eterm caller;
233
234 ASSERT(ptp->type == ERTS_PORT_TASK_PROC_SIG);
235
236 caller = ptp->u.alive.td.psig.data.caller;
237
238 ASSERT(is_internal_pid(caller) || is_internal_port(caller));
239
240 return caller;
241 }
242
243 /*
244 * Busy queue management
245 */
246
247 static ERTS_INLINE int
caller2bix(Eterm caller)248 caller2bix(Eterm caller)
249 {
250 ASSERT(is_internal_pid(caller) || is_internal_port(caller));
251 return (int) (_GET_PID_DATA(caller) % ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS);
252 }
253
254
255 static void
popped_from_busy_queue(Port * pp,ErtsPortTask * ptp,int last)256 popped_from_busy_queue(Port *pp, ErtsPortTask *ptp, int last)
257 {
258 ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp;
259 ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
260 Eterm caller = task_caller(ptp);
261 int bix = caller2bix(caller);
262
263 ASSERT(is_internal_pid(caller));
264
265 ASSERT(tabp);
266 bcp = tabp->bucket[bix];
267 prev_bcpp = &tabp->bucket[bix];
268 ASSERT(bcp);
269 while (bcp->caller != caller) {
270 prev_bcpp = &bcp->next;
271 bcp = bcp->next;
272 ASSERT(bcp);
273 }
274 ASSERT(bcp->count > 0);
275 if (--bcp->count != 0) {
276 ASSERT(!last);
277 }
278 else {
279 *prev_bcpp = bcp->next;
280 if (bcp == &tabp->pre_alloc_busy_caller)
281 bcp->caller = am_undefined;
282 else
283 erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
284 if (last) {
285 #ifdef DEBUG
286 erts_aint32_t flags =
287 #endif
288 erts_atomic32_read_band_nob(
289 &pp->sched.flags,
290 ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
291 ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
292 #ifdef DEBUG
293 for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
294 ASSERT(!tabp->bucket[bix]);
295 }
296 #endif
297 busy_caller_table_free(tabp);
298 pp->sched.taskq.local.busy.first = NULL;
299 pp->sched.taskq.local.busy.last = NULL;
300 pp->sched.taskq.local.busy.table = NULL;
301 }
302 }
303 }
304
305 static void
busy_wait_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)306 busy_wait_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
307 {
308 ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
309 Eterm caller = task_caller(ptp);
310 ErtsPortTaskBusyCaller *bcp;
311 int bix;
312
313 ASSERT(is_internal_pid(caller));
314 /*
315 * Port is busy and this task type needs to wait until not busy.
316 */
317
318 ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY);
319
320 ptp->u.alive.next = NULL;
321 if (pp->sched.taskq.local.busy.last) {
322 ASSERT(pp->sched.taskq.local.busy.first);
323 pp->sched.taskq.local.busy.last->u.alive.next = ptp;
324 }
325 else {
326 int i;
327 #ifdef DEBUG
328 erts_aint32_t flags;
329 #endif
330 pp->sched.taskq.local.busy.first = ptp;
331
332 #ifdef DEBUG
333 flags =
334 #endif
335 erts_atomic32_read_bor_nob(&pp->sched.flags,
336 ERTS_PTS_FLG_HAVE_BUSY_TASKS);
337 ASSERT(!(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
338
339 ASSERT(!tabp);
340
341 tabp = busy_caller_table_alloc();
342 pp->sched.taskq.local.busy.table = tabp;
343 for (i = 0; i < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; i++)
344 tabp->bucket[i] = NULL;
345 tabp->pre_alloc_busy_caller.caller = am_undefined;
346 }
347 pp->sched.taskq.local.busy.last = ptp;
348
349 bix = caller2bix(caller);
350 ASSERT(tabp);
351 bcp = tabp->bucket[bix];
352
353 while (bcp && bcp->caller != caller)
354 bcp = bcp->next;
355
356 if (bcp)
357 bcp->count++;
358 else {
359 if (tabp->pre_alloc_busy_caller.caller == am_undefined)
360 bcp = &tabp->pre_alloc_busy_caller;
361 else
362 bcp = erts_alloc(ERTS_ALC_T_BUSY_CALLER,
363 sizeof(ErtsPortTaskBusyCaller));
364 bcp->caller = caller;
365 bcp->count = 1;
366 bcp->next = tabp->bucket[bix];
367 tabp->bucket[bix] = bcp;
368 }
369
370 bcp->last = ptp;
371 }
372
373 static ERTS_INLINE int
check_sig_dep_move_to_busy_queue(Port * pp,ErtsPortTask * ptp)374 check_sig_dep_move_to_busy_queue(Port *pp, ErtsPortTask *ptp)
375 {
376 ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
377 ErtsPortTask *last_ptp;
378 ErtsPortTaskBusyCaller *bcp;
379 int bix;
380 Eterm caller;
381
382 ASSERT(ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP);
383 ASSERT(pp->sched.taskq.local.busy.last);
384 ASSERT(tabp);
385
386
387 /*
388 * We are either not busy, or the task does not imply wait on busy port.
389 * However, due to the signaling order requirements the task might depend
390 * on other tasks in the busy queue.
391 */
392
393 caller = task_caller(ptp);
394 bix = caller2bix(caller);
395 bcp = tabp->bucket[bix];
396 while (bcp && bcp->caller != caller)
397 bcp = bcp->next;
398
399 if (!bcp)
400 return 0;
401
402 /*
403 * There are other tasks that we depend on in the busy queue;
404 * move into busy queue.
405 */
406
407 bcp->count++;
408 last_ptp = bcp->last;
409 ptp->u.alive.next = last_ptp->u.alive.next;
410 if (!ptp->u.alive.next) {
411 ASSERT(pp->sched.taskq.local.busy.last == last_ptp);
412 pp->sched.taskq.local.busy.last = ptp;
413 }
414 last_ptp->u.alive.next = ptp;
415 bcp->last = ptp;
416
417 return 1;
418 }
419
420 static void
no_sig_dep_move_from_busyq(Port * pp)421 no_sig_dep_move_from_busyq(Port *pp)
422 {
423 ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
424 ErtsPortTask *first_ptp, *last_ptp, *ptp;
425 ErtsPortTaskBusyCaller **prev_bcpp = NULL, *bcp = NULL;
426
427 /*
428 * Move tasks at the head of the busy queue that no longer
429 * have any dependencies to busy wait tasks into the ordinary
430 * queue.
431 */
432
433 first_ptp = ptp = pp->sched.taskq.local.busy.first;
434
435 ASSERT(ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
436 ASSERT(tabp);
437
438 do {
439 Eterm caller = task_caller(ptp);
440
441 if (!bcp || bcp->caller != caller) {
442 int bix = caller2bix(caller);
443
444 prev_bcpp = &tabp->bucket[bix];
445 bcp = tabp->bucket[bix];
446 ASSERT(bcp);
447 while (bcp->caller != caller) {
448 ASSERT(bcp);
449 prev_bcpp = &bcp->next;
450 bcp = bcp->next;
451 }
452 }
453
454 ASSERT(bcp->caller == caller);
455 ASSERT(bcp->count > 0);
456
457 if (--bcp->count == 0) {
458 *prev_bcpp = bcp->next;
459 if (bcp == &tabp->pre_alloc_busy_caller)
460 bcp->caller = am_undefined;
461 else
462 erts_free(ERTS_ALC_T_BUSY_CALLER, bcp);
463 }
464
465 last_ptp = ptp;
466 ptp = ptp->u.alive.next;
467 } while (ptp && !(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY));
468
469 pp->sched.taskq.local.busy.first = last_ptp->u.alive.next;
470 if (!pp->sched.taskq.local.busy.first) {
471 #ifdef DEBUG
472 int bix;
473 erts_aint32_t flags =
474 #endif
475 erts_atomic32_read_band_nob(
476 &pp->sched.flags,
477 ~ERTS_PTS_FLG_HAVE_BUSY_TASKS);
478 ASSERT(flags & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
479 #ifdef DEBUG
480 for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
481 ASSERT(!tabp->bucket[bix]);
482 }
483 #endif
484 busy_caller_table_free(tabp);
485 pp->sched.taskq.local.busy.last = NULL;
486 pp->sched.taskq.local.busy.table = NULL;
487 }
488 last_ptp->u.alive.next = pp->sched.taskq.local.first;
489 pp->sched.taskq.local.first = first_ptp;
490 }
491
492 #ifdef ERTS_HARD_DEBUG_TASK_QUEUES
493
494 static void
chk_task_queues(Port * pp,ErtsPortTask * execq,int processing_busy_queue)495 chk_task_queues(Port *pp, ErtsPortTask *execq, int processing_busy_queue)
496 {
497 Sint tot_count, tot_table_count;
498 int bix;
499 ErtsPortTask *ptp, *last;
500 ErtsPortTask *first = processing_busy_queue ? execq : pp->sched.taskq.local.busy.first;
501 ErtsPortTask *nb_task_queue = processing_busy_queue ? pp->sched.taskq.local.first : execq;
502 ErtsPortTaskBusyCallerTable *tabp = pp->sched.taskq.local.busy.table;
503 ErtsPortTaskBusyCaller *bcp;
504
505 if (!first) {
506 ASSERT(!tabp);
507 ASSERT(!pp->sched.taskq.local.busy.last);
508 ASSERT(!(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS));
509 return;
510 }
511
512 ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_HAVE_BUSY_TASKS);
513 ASSERT(tabp);
514
515 tot_count = 0;
516 ptp = first;
517 while (ptp) {
518 Sint count = 0;
519 Eterm caller = task_caller(ptp);
520 int bix = caller2bix(caller);
521 for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
522 if (bcp->caller == caller)
523 break;
524 ASSERT(bcp && bcp->caller == caller);
525
526 ASSERT(bcp->last);
527 while (1) {
528 ErtsPortTask *ptp2;
529
530 ASSERT(caller == task_caller(ptp));
531 count++;
532 tot_count++;
533 last = ptp;
534
535 for (ptp2 = nb_task_queue; ptp2; ptp2 = ptp2->u.alive.next) {
536 ASSERT(ptp != ptp2);
537 }
538
539 if (ptp == bcp->last)
540 break;
541 ptp = ptp->u.alive.next;
542 }
543
544 ASSERT(count == bcp->count);
545 ptp = ptp->u.alive.next;
546 }
547
548 tot_table_count = 0;
549 for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
550 for (bcp = tabp->bucket[bix]; bcp; bcp = bcp->next)
551 tot_table_count += bcp->count;
552 }
553
554 ASSERT(tot_count == tot_table_count);
555
556 ASSERT(last == pp->sched.taskq.local.busy.last);
557 }
558
559 #endif /* ERTS_HARD_DEBUG_TASK_QUEUES */
560
561 /*
562 * Task handle manipulation.
563 */
564
565 static ERTS_INLINE void
reset_port_task_handle(ErtsPortTaskHandle * pthp)566 reset_port_task_handle(ErtsPortTaskHandle *pthp)
567 {
568 erts_atomic_set_relb(pthp, (erts_aint_t) NULL);
569 }
570
571 static ERTS_INLINE ErtsPortTask *
handle2task(ErtsPortTaskHandle * pthp)572 handle2task(ErtsPortTaskHandle *pthp)
573 {
574 return (ErtsPortTask *) erts_atomic_read_acqb(pthp);
575 }
576
577 static ERTS_INLINE void
reset_handle(ErtsPortTask * ptp)578 reset_handle(ErtsPortTask *ptp)
579 {
580 if (ptp->u.alive.handle) {
581 ASSERT(ptp == handle2task(ptp->u.alive.handle));
582 reset_port_task_handle(ptp->u.alive.handle);
583 }
584 }
585
586 static ERTS_INLINE void
reset_executed_io_task_handle(Port * prt,ErtsPortTask * ptp)587 reset_executed_io_task_handle(Port *prt, ErtsPortTask *ptp)
588 {
589 if (ptp->u.alive.handle) {
590 ASSERT(ptp == handle2task(ptp->u.alive.handle));
591 #if ERTS_POLL_USE_SCHEDULER_POLLING
592 if (ptp->u.alive.td.io.is_scheduler_event) {
593 if ((erts_atomic32_read_nob(&prt->state) & ERTS_PORT_SFLG_CHECK_FD_CLEANUP)) {
594 erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
595 reset_port_task_handle);
596 erts_atomic32_read_band_nob(&prt->state, ~ERTS_PORT_SFLG_CHECK_FD_CLEANUP);
597 } else {
598 reset_port_task_handle(ptp->u.alive.handle);
599 }
600 } else
601 #endif
602 {
603 /* The port task handle is reset inside task_executed */
604 erts_io_notify_port_task_executed(ptp->type, ptp->u.alive.handle,
605 reset_port_task_handle);
606 }
607 }
608 }
609
610 static ERTS_INLINE void
set_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)611 set_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
612 {
613 ptp->u.alive.handle = pthp;
614 if (pthp) {
615 erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
616 ASSERT(ptp == handle2task(ptp->u.alive.handle));
617 }
618 }
619
620 static ERTS_INLINE void
set_tmp_handle(ErtsPortTask * ptp,ErtsPortTaskHandle * pthp)621 set_tmp_handle(ErtsPortTask *ptp, ErtsPortTaskHandle *pthp)
622 {
623 ptp->u.alive.handle = NULL;
624 if (pthp) {
625 /*
626 * IMPORTANT! Task either need to be aborted, or task handle
627 * need to be detached before thread progress has been made.
628 */
629 erts_atomic_set_relb(pthp, (erts_aint_t) ptp);
630 }
631 }
632
633
634 /*
635 * Busy port queue management
636 */
637
638 static erts_aint32_t
check_unset_busy_port_q(Port * pp,erts_aint32_t flags,ErtsPortTaskBusyPortQ * bpq)639 check_unset_busy_port_q(Port *pp,
640 erts_aint32_t flags,
641 ErtsPortTaskBusyPortQ *bpq)
642 {
643 ErlDrvSizeT qsize, low;
644 int resume_procs = 0;
645
646 ASSERT(bpq);
647 ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
648
649 erts_port_task_sched_lock(&pp->sched);
650 qsize = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
651 low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
652 if (qsize < low) {
653 erts_aint32_t mask = ~(ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
654 | ERTS_PTS_FLG_BUSY_PORT_Q);
655 flags = erts_atomic32_read_band_relb(&pp->sched.flags, mask);
656 if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
657 resume_procs = 1;
658 }
659 else if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q) {
660 flags = erts_atomic32_read_band_relb(&pp->sched.flags,
661 ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
662 flags &= ~ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
663 }
664 erts_port_task_sched_unlock(&pp->sched);
665 if (resume_procs)
666 erts_port_resume_procs(pp);
667
668 return flags;
669 }
670
671 static ERTS_INLINE void
aborted_proc2port_data(Port * pp,ErlDrvSizeT size)672 aborted_proc2port_data(Port *pp, ErlDrvSizeT size)
673 {
674 ErtsPortTaskBusyPortQ *bpq;
675 erts_aint32_t flags;
676 ErlDrvSizeT qsz;
677
678 ASSERT(pp->sched.taskq.bpq);
679
680 if (size == 0)
681 return;
682
683 bpq = pp->sched.taskq.bpq;
684
685 qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
686 (erts_aint_t) -size);
687 ASSERT(qsz + size > qsz);
688 flags = erts_atomic32_read_nob(&pp->sched.flags);
689 ASSERT(pp->sched.taskq.bpq);
690 if ((flags & (ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q
691 | ERTS_PTS_FLG_BUSY_PORT_Q)) != ERTS_PTS_FLG_BUSY_PORT_Q)
692 return;
693 if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low))
694 erts_atomic32_read_bor_nob(&pp->sched.flags,
695 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
696 }
697
698 static ERTS_INLINE void
dequeued_proc2port_data(Port * pp,ErlDrvSizeT size)699 dequeued_proc2port_data(Port *pp, ErlDrvSizeT size)
700 {
701 ErtsPortTaskBusyPortQ *bpq;
702 erts_aint32_t flags;
703 ErlDrvSizeT qsz;
704
705 ASSERT(pp->sched.taskq.bpq);
706
707 if (size == 0)
708 return;
709
710 bpq = pp->sched.taskq.bpq;
711
712 qsz = (ErlDrvSizeT) erts_atomic_add_read_acqb(&bpq->size,
713 (erts_aint_t) -size);
714 ASSERT(qsz + size > qsz);
715 flags = erts_atomic32_read_nob(&pp->sched.flags);
716 if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q))
717 return;
718 if (qsz < (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->low))
719 check_unset_busy_port_q(pp, flags, bpq);
720 }
721
722 static ERTS_INLINE erts_aint32_t
enqueue_proc2port_data(Port * pp,ErtsProc2PortSigData * sigdp,erts_aint32_t flags)723 enqueue_proc2port_data(Port *pp,
724 ErtsProc2PortSigData *sigdp,
725 erts_aint32_t flags)
726 {
727 ErtsPortTaskBusyPortQ *bpq = pp->sched.taskq.bpq;
728 if (sigdp && bpq) {
729 ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
730 if (size) {
731 erts_aint_t asize = erts_atomic_add_read_acqb(&bpq->size,
732 (erts_aint_t) size);
733 ErlDrvSizeT qsz = (ErlDrvSizeT) asize;
734
735 ASSERT(qsz - size < qsz);
736
737 if (!(flags & ERTS_PTS_FLG_BUSY_PORT_Q) && qsz > bpq->high) {
738 flags = erts_atomic32_read_bor_acqb(&pp->sched.flags,
739 ERTS_PTS_FLG_BUSY_PORT_Q);
740 flags |= ERTS_PTS_FLG_BUSY_PORT_Q;
741 qsz = (ErlDrvSizeT) erts_atomic_read_acqb(&bpq->size);
742 if (qsz < (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low)) {
743 flags = (erts_atomic32_read_bor_relb(
744 &pp->sched.flags,
745 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q));
746 flags |= ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q;
747 }
748 }
749 ASSERT(!(flags & ERTS_PTS_FLG_EXIT));
750 }
751 }
752 return flags;
753 }
754
755 /*
756 * erl_drv_busy_msgq_limits() is called by drivers either reading or
757 * writing the limits.
758 *
759 * A limit of zero is interpreted as a read only request (using a
760 * limit of zero would not be useful). Other values are interpreted
761 * as a write-read request.
762 */
763
764 void
erl_drv_busy_msgq_limits(ErlDrvPort dport,ErlDrvSizeT * lowp,ErlDrvSizeT * highp)765 erl_drv_busy_msgq_limits(ErlDrvPort dport, ErlDrvSizeT *lowp, ErlDrvSizeT *highp)
766 {
767 Port *pp = erts_drvport2port(dport);
768 ErtsPortTaskBusyPortQ *bpq;
769 int written = 0, resume_procs = 0;
770 ErlDrvSizeT low, high;
771
772 if (pp == ERTS_INVALID_ERL_DRV_PORT || !(bpq = pp->sched.taskq.bpq)) {
773 if (lowp)
774 *lowp = ERL_DRV_BUSY_MSGQ_DISABLED;
775 if (highp)
776 *highp = ERL_DRV_BUSY_MSGQ_DISABLED;
777 return;
778 }
779
780 low = lowp ? *lowp : 0;
781 high = highp ? *highp : 0;
782
783 erts_port_task_sched_lock(&pp->sched);
784
785 if (low == ERL_DRV_BUSY_MSGQ_DISABLED
786 || high == ERL_DRV_BUSY_MSGQ_DISABLED) {
787 /* Disable busy msgq feature */
788 erts_aint32_t flags;
789 pp->sched.taskq.bpq = NULL;
790 flags = ~(ERTS_PTS_FLG_BUSY_PORT_Q|ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
791 flags = erts_atomic32_read_band_acqb(&pp->sched.flags, flags);
792 if ((flags & ERTS_PTS_FLGS_BUSY) == ERTS_PTS_FLG_BUSY_PORT_Q)
793 resume_procs = 1;
794 }
795 else {
796
797 if (!low)
798 low = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->low);
799 else {
800 if (bpq->high < low)
801 bpq->high = low;
802 erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
803 written = 1;
804 }
805
806 if (!high)
807 high = bpq->high;
808 else {
809 if (low > high) {
810 low = high;
811 erts_atomic_set_relb(&bpq->low, (erts_aint_t) low);
812 }
813 bpq->high = high;
814 written = 1;
815 }
816
817 if (written) {
818 ErlDrvSizeT size = (ErlDrvSizeT) erts_atomic_read_nob(&bpq->size);
819 if (size > high)
820 erts_atomic32_read_bor_relb(&pp->sched.flags,
821 ERTS_PTS_FLG_BUSY_PORT_Q);
822 else if (size < low)
823 erts_atomic32_read_bor_relb(&pp->sched.flags,
824 ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q);
825 }
826 }
827
828 erts_port_task_sched_unlock(&pp->sched);
829
830 if (resume_procs)
831 erts_port_resume_procs(pp);
832 if (lowp)
833 *lowp = low;
834 if (highp)
835 *highp = high;
836 }
837
838 /*
839 * No-suspend handles.
840 */
841
842 static void
free_port_task_handle_list(void * vpthlp)843 free_port_task_handle_list(void *vpthlp)
844 {
845 erts_free(ERTS_ALC_T_PT_HNDL_LIST, vpthlp);
846 }
847
848 static void
schedule_port_task_handle_list_free(ErtsPortTaskHandleList * pthlp)849 schedule_port_task_handle_list_free(ErtsPortTaskHandleList *pthlp)
850 {
851 erts_schedule_thr_prgr_later_cleanup_op(free_port_task_handle_list,
852 (void *) pthlp,
853 &pthlp->u.release,
854 sizeof(ErtsPortTaskHandleList));
855 }
856
857 static ERTS_INLINE void
abort_signal_task(Port * pp,int abort_type,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)858 abort_signal_task(Port *pp,
859 int abort_type,
860 ErtsPortTaskType type,
861 ErtsPortTaskTypeData *tdp,
862 int bpq_data)
863 {
864
865 ASSERT(type == ERTS_PORT_TASK_PROC_SIG);
866
867 if (!bpq_data)
868 tdp->psig.callback(NULL,
869 ERTS_PORT_SFLG_INVALID,
870 abort_type,
871 &tdp->psig.data);
872 else {
873 ErlDrvSizeT size = erts_proc2port_sig_command_data_size(&tdp->psig.data);
874 tdp->psig.callback(NULL,
875 ERTS_PORT_SFLG_INVALID,
876 abort_type,
877 &tdp->psig.data);
878 aborted_proc2port_data(pp, size);
879 }
880 }
881
882
883 static ERTS_INLINE void
abort_nosuspend_task(Port * pp,ErtsPortTaskType type,ErtsPortTaskTypeData * tdp,int bpq_data)884 abort_nosuspend_task(Port *pp,
885 ErtsPortTaskType type,
886 ErtsPortTaskTypeData *tdp,
887 int bpq_data)
888 {
889 abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT_NOSUSPEND, type, tdp, bpq_data);
890 }
891
892 static ErtsPortTaskHandleList *
get_free_nosuspend_handles(Port * pp)893 get_free_nosuspend_handles(Port *pp)
894 {
895 ErtsPortTaskHandleList *nshp, *last_nshp = NULL;
896
897 ERTS_LC_ASSERT(erts_port_task_sched_lock_is_locked(&pp->sched));
898
899 nshp = pp->sched.taskq.local.busy.nosuspend;
900
901 while (nshp && !erts_port_task_is_scheduled(&nshp->handle)) {
902 last_nshp = nshp;
903 nshp = nshp->u.next;
904 }
905
906 if (!last_nshp)
907 nshp = NULL;
908 else {
909 nshp = pp->sched.taskq.local.busy.nosuspend;
910 pp->sched.taskq.local.busy.nosuspend = last_nshp->u.next;
911 last_nshp->u.next = NULL;
912 if (!pp->sched.taskq.local.busy.nosuspend)
913 erts_atomic32_read_band_nob(&pp->sched.flags,
914 ~ERTS_PTS_FLG_HAVE_NS_TASKS);
915 }
916 return nshp;
917 }
918
919 static void
free_nosuspend_handles(ErtsPortTaskHandleList * free_nshp)920 free_nosuspend_handles(ErtsPortTaskHandleList *free_nshp)
921 {
922 while (free_nshp) {
923 ErtsPortTaskHandleList *nshp = free_nshp;
924 free_nshp = free_nshp->u.next;
925 schedule_port_task_handle_list_free(nshp);
926 }
927 }
928
929 /*
930 * Port queue operations
931 */
932
933 static ERTS_INLINE void
enqueue_port(ErtsRunQueue * runq,Port * pp)934 enqueue_port(ErtsRunQueue *runq, Port *pp)
935 {
936 ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
937 pp->sched.next = NULL;
938 if (runq->ports.end) {
939 ASSERT(runq->ports.start);
940 runq->ports.end->sched.next = pp;
941 }
942 else {
943 ASSERT(!runq->ports.start);
944 runq->ports.start = pp;
945 }
946
947 runq->ports.end = pp;
948 ASSERT(runq->ports.start && runq->ports.end);
949
950 erts_inc_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
951
952 if (ERTS_RUNQ_FLGS_GET_NOB(runq) & ERTS_RUNQ_FLG_HALTING)
953 erts_non_empty_runq(runq);
954 }
955
956 static ERTS_INLINE Port *
pop_port(ErtsRunQueue * runq)957 pop_port(ErtsRunQueue *runq)
958 {
959 Port *pp = runq->ports.start;
960 ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
961 if (!pp) {
962 ASSERT(!runq->ports.end);
963 }
964 else {
965 runq->ports.start = runq->ports.start->sched.next;
966 if (!runq->ports.start) {
967 ASSERT(runq->ports.end == pp);
968 runq->ports.end = NULL;
969 }
970 erts_dec_runq_len(runq, &runq->ports.info, ERTS_PORT_PRIO_LEVEL);
971 }
972
973 ASSERT(runq->ports.start || !runq->ports.end);
974 ASSERT(runq->ports.end || !runq->ports.start);
975 return pp;
976 }
977
978 /*
979 * Task queue operations
980 */
981
982 static ERTS_INLINE int
enqueue_task(Port * pp,ErtsPortTask * ptp,ErtsProc2PortSigData * sigdp,ErtsPortTaskHandleList * ns_pthlp,erts_aint32_t * flagsp)983 enqueue_task(Port *pp,
984 ErtsPortTask *ptp,
985 ErtsProc2PortSigData *sigdp,
986 ErtsPortTaskHandleList *ns_pthlp,
987 erts_aint32_t *flagsp)
988
989 {
990 int res;
991 erts_aint32_t fail_flags = ERTS_PTS_FLG_EXIT;
992 erts_aint32_t flags;
993 ptp->u.alive.next = NULL;
994 if (ns_pthlp)
995 fail_flags |= ERTS_PTS_FLG_BUSY_PORT;
996 erts_port_task_sched_lock(&pp->sched);
997 flags = erts_atomic32_read_nob(&pp->sched.flags);
998 if (flags & fail_flags)
999 res = 0;
1000 else {
1001 if (ns_pthlp) {
1002 ns_pthlp->u.next = pp->sched.taskq.local.busy.nosuspend;
1003 pp->sched.taskq.local.busy.nosuspend = ns_pthlp;
1004 }
1005 if (pp->sched.taskq.in.last) {
1006 ASSERT(pp->sched.taskq.in.first);
1007 ASSERT(!pp->sched.taskq.in.last->u.alive.next);
1008
1009 pp->sched.taskq.in.last->u.alive.next = ptp;
1010 }
1011 else {
1012 ASSERT(!pp->sched.taskq.in.first);
1013
1014 pp->sched.taskq.in.first = ptp;
1015 }
1016 pp->sched.taskq.in.last = ptp;
1017 flags = enqueue_proc2port_data(pp, sigdp, flags);
1018 res = 1;
1019 }
1020 erts_port_task_sched_unlock(&pp->sched);
1021 *flagsp = flags;
1022 return res;
1023 }
1024
1025 static ERTS_INLINE void
prepare_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1026 prepare_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1027 {
1028 erts_aint32_t act = erts_atomic32_read_nob(&pp->sched.flags);
1029
1030 if (!pp->sched.taskq.local.busy.first || (act & ERTS_PTS_FLG_BUSY_PORT)) {
1031 *execqp = pp->sched.taskq.local.first;
1032 *processing_busy_q_p = 0;
1033 }
1034 else {
1035 *execqp = pp->sched.taskq.local.busy.first;
1036 *processing_busy_q_p = 1;
1037 }
1038
1039 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1040
1041 while (1) {
1042 erts_aint32_t new, exp;
1043
1044 new = exp = act;
1045
1046 new &= ~ERTS_PTS_FLG_IN_RUNQ;
1047 new |= ERTS_PTS_FLG_EXEC;
1048
1049 act = erts_atomic32_cmpxchg_nob(&pp->sched.flags, new, exp);
1050
1051 ASSERT(act & ERTS_PTS_FLG_IN_RUNQ);
1052
1053 if (exp == act)
1054 break;
1055 }
1056 }
1057
1058 /* finalize_exec() return value != 0 if port should remain active */
1059 static ERTS_INLINE int
finalize_exec(Port * pp,ErtsPortTask ** execq,int processing_busy_q)1060 finalize_exec(Port *pp, ErtsPortTask **execq, int processing_busy_q)
1061 {
1062 erts_aint32_t act;
1063 unsigned int prof_runnable_ports;
1064
1065 if (!processing_busy_q)
1066 pp->sched.taskq.local.first = *execq;
1067 else {
1068 pp->sched.taskq.local.busy.first = *execq;
1069 ASSERT(*execq);
1070 }
1071
1072 ERTS_PT_DBG_CHK_TASK_QS(pp, *execq, processing_busy_q);
1073
1074 *execq = NULL;
1075
1076 act = erts_atomic32_read_nob(&pp->sched.flags);
1077 if (act & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1078 act = check_unset_busy_port_q(pp, act, pp->sched.taskq.bpq);
1079
1080 prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1081 if (prof_runnable_ports)
1082 erts_port_task_sched_lock(&pp->sched);
1083
1084 while (1) {
1085 erts_aint32_t new, exp;
1086
1087 new = exp = act;
1088
1089 new &= ~ERTS_PTS_FLG_EXEC;
1090 if (act & ERTS_PTS_FLG_HAVE_TASKS)
1091 new |= ERTS_PTS_FLG_IN_RUNQ;
1092
1093 act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1094
1095 ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_IN_RUNQ));
1096 ERTS_LC_ASSERT(!(act & ERTS_PTS_FLG_EXEC_IMM));
1097
1098 if (exp == act)
1099 break;
1100 }
1101
1102 if (prof_runnable_ports | IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1103 /* trace port scheduling, out */
1104 if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS))
1105 trace_sched_ports(pp, am_out);
1106 if (prof_runnable_ports) {
1107 if (!(act & (ERTS_PTS_FLG_EXEC_IMM|ERTS_PTS_FLG_HAVE_TASKS)))
1108 profile_runnable_port(pp, am_inactive);
1109 erts_port_task_sched_unlock(&pp->sched);
1110 }
1111 }
1112
1113 return (act & ERTS_PTS_FLG_HAVE_TASKS) != 0;
1114 }
1115
1116 static ERTS_INLINE erts_aint32_t
select_queue_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1117 select_queue_for_exec(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1118 {
1119 erts_aint32_t flags = erts_atomic32_read_nob(&pp->sched.flags);
1120
1121 if (flags & ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q)
1122 flags = check_unset_busy_port_q(pp, flags, pp->sched.taskq.bpq);
1123
1124 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1125
1126 if (flags & ERTS_PTS_FLG_BUSY_PORT) {
1127 if (*processing_busy_q_p) {
1128 ErtsPortTask *ptp;
1129
1130 ptp = pp->sched.taskq.local.busy.first = *execqp;
1131 if (!ptp)
1132 pp->sched.taskq.local.busy.last = NULL;
1133 else if (!(ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY))
1134 no_sig_dep_move_from_busyq(pp);
1135
1136 *execqp = pp->sched.taskq.local.first;
1137 *processing_busy_q_p = 0;
1138
1139 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1140 }
1141
1142 return flags;
1143 }
1144
1145 /* Not busy */
1146
1147 if (!*processing_busy_q_p && pp->sched.taskq.local.busy.first) {
1148 pp->sched.taskq.local.first = *execqp;
1149 *execqp = pp->sched.taskq.local.busy.first;
1150 *processing_busy_q_p = 1;
1151
1152 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1153 }
1154
1155 return flags;
1156 }
1157
1158 /*
1159 * check_task_for_exec() returns a value !0 if the task
1160 * is ok to execute; otherwise 0.
1161 */
1162 static ERTS_INLINE int
check_task_for_exec(Port * pp,erts_aint32_t flags,ErtsPortTask ** execqp,int * processing_busy_q_p,ErtsPortTask * ptp)1163 check_task_for_exec(Port *pp,
1164 erts_aint32_t flags,
1165 ErtsPortTask **execqp,
1166 int *processing_busy_q_p,
1167 ErtsPortTask *ptp)
1168 {
1169
1170 if (!*processing_busy_q_p) {
1171 /* Processing normal queue */
1172
1173 ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1174
1175 if ((flags & ERTS_PTS_FLG_BUSY_PORT)
1176 && (ptp->u.alive.flags & ERTS_PT_FLG_WAIT_BUSY)) {
1177
1178 busy_wait_move_to_busy_queue(pp, ptp);
1179 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1180
1181 return 0;
1182 }
1183
1184 if (pp->sched.taskq.local.busy.last
1185 && (ptp->u.alive.flags & ERTS_PT_FLG_SIG_DEP)) {
1186
1187 int res = !check_sig_dep_move_to_busy_queue(pp, ptp);
1188 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1189
1190 return res;
1191 }
1192
1193 }
1194 else {
1195 /* Processing busy queue */
1196
1197 ASSERT(!(flags & ERTS_PTS_FLG_BUSY_PORT));
1198
1199 ERTS_PT_DBG_CHK_TASK_QS(pp, ptp, *processing_busy_q_p);
1200
1201 popped_from_busy_queue(pp, ptp, !*execqp);
1202
1203 if (!*execqp) {
1204 *execqp = pp->sched.taskq.local.first;
1205 *processing_busy_q_p = 0;
1206 }
1207
1208 ERTS_PT_DBG_CHK_TASK_QS(pp, *execqp, *processing_busy_q_p);
1209
1210 }
1211
1212 return 1;
1213 }
1214
1215 static ErtsPortTask *
fetch_in_queue(Port * pp,ErtsPortTask ** execqp)1216 fetch_in_queue(Port *pp, ErtsPortTask **execqp)
1217 {
1218 ErtsPortTask *ptp;
1219 ErtsPortTaskHandleList *free_nshp = NULL;
1220
1221 erts_port_task_sched_lock(&pp->sched);
1222
1223 ptp = pp->sched.taskq.in.first;
1224 pp->sched.taskq.in.first = NULL;
1225 pp->sched.taskq.in.last = NULL;
1226 if (ptp)
1227 *execqp = ptp->u.alive.next;
1228 else
1229 erts_atomic32_read_band_nob(&pp->sched.flags,
1230 ~ERTS_PTS_FLG_HAVE_TASKS);
1231
1232
1233 if (pp->sched.taskq.local.busy.nosuspend)
1234 free_nshp = get_free_nosuspend_handles(pp);
1235
1236 erts_port_task_sched_unlock(&pp->sched);
1237
1238 if (free_nshp)
1239 free_nosuspend_handles(free_nshp);
1240
1241 return ptp;
1242 }
1243
1244 static ERTS_INLINE ErtsPortTask *
select_task_for_exec(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1245 select_task_for_exec(Port *pp,
1246 ErtsPortTask **execqp,
1247 int *processing_busy_q_p)
1248 {
1249 ErtsPortTask *ptp;
1250 erts_aint32_t flags;
1251
1252 flags = select_queue_for_exec(pp, execqp, processing_busy_q_p);
1253
1254 while (1) {
1255 ptp = *execqp;
1256 if (ptp)
1257 *execqp = ptp->u.alive.next;
1258 else {
1259 ptp = fetch_in_queue(pp, execqp);
1260 if (!ptp)
1261 return NULL;
1262 }
1263 if (check_task_for_exec(pp, flags, execqp, processing_busy_q_p, ptp))
1264 return ptp;
1265 }
1266 }
1267
1268 /*
1269 * Cut time slice
1270 */
1271
1272 int
erl_drv_consume_timeslice(ErlDrvPort dprt,int percent)1273 erl_drv_consume_timeslice(ErlDrvPort dprt, int percent)
1274 {
1275 Port *pp = erts_drvport2port(dprt);
1276 if (pp == ERTS_INVALID_ERL_DRV_PORT)
1277 return -1;
1278 if (percent < 1)
1279 percent = 1;
1280 else if (100 < percent)
1281 percent = 100;
1282 pp->reds += percent*((CONTEXT_REDS+99)/100);
1283 if (pp->reds < CONTEXT_REDS)
1284 return 0;
1285 pp->reds = CONTEXT_REDS;
1286 return 1;
1287 }
1288
1289 void
erts_port_task_tmp_handle_detach(ErtsPortTaskHandle * pthp)1290 erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *pthp)
1291 {
1292 ERTS_LC_ASSERT(erts_thr_progress_lc_is_delaying());
1293 reset_port_task_handle(pthp);
1294 }
1295
1296 /*
1297 * Abort a scheduled task.
1298 */
1299
1300 int
erts_port_task_abort(ErtsPortTaskHandle * pthp)1301 erts_port_task_abort(ErtsPortTaskHandle *pthp)
1302 {
1303 int res;
1304 ErtsPortTask *ptp;
1305 ErtsThrPrgrDelayHandle dhndl = erts_thr_progress_unmanaged_delay();
1306
1307 ptp = handle2task(pthp);
1308 if (!ptp)
1309 res = -1;
1310 else {
1311 erts_aint32_t old_state;
1312
1313 #ifdef DEBUG
1314 ErtsPortTaskHandle *saved_pthp = ptp->u.alive.handle;
1315 ERTS_THR_READ_MEMORY_BARRIER;
1316 old_state = erts_atomic32_read_nob(&ptp->state);
1317 if (old_state == ERTS_PT_STATE_SCHEDULED) {
1318 ASSERT(!saved_pthp || saved_pthp == pthp);
1319 }
1320 #endif
1321
1322 old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1323 ERTS_PT_STATE_ABORTED,
1324 ERTS_PT_STATE_SCHEDULED);
1325 if (old_state != ERTS_PT_STATE_SCHEDULED)
1326 res = - 1; /* Task already aborted, executing, or executed */
1327 else {
1328 reset_port_task_handle(pthp);
1329
1330 #if ERTS_POLL_USE_SCHEDULER_POLLING
1331 switch (ptp->type) {
1332 case ERTS_PORT_TASK_INPUT:
1333 case ERTS_PORT_TASK_OUTPUT:
1334 if (ptp->u.alive.td.io.is_scheduler_event) {
1335 ASSERT(erts_atomic_read_nob(
1336 &erts_port_task_outstanding_io_tasks) > 0);
1337 erts_atomic_dec_relb(&erts_port_task_outstanding_io_tasks);
1338 }
1339 break;
1340 default:
1341 break;
1342 }
1343 #endif
1344
1345 res = 0;
1346 }
1347 }
1348
1349 erts_thr_progress_unmanaged_continue(dhndl);
1350
1351 return res;
1352 }
1353
1354 void
erts_port_task_abort_nosuspend_tasks(Port * pp)1355 erts_port_task_abort_nosuspend_tasks(Port *pp)
1356 {
1357 ErtsPortTaskHandleList *abort_list;
1358 ErtsThrPrgrDelayHandle dhndl = ERTS_THR_PRGR_DHANDLE_INVALID;
1359
1360 erts_port_task_sched_lock(&pp->sched);
1361 erts_atomic32_read_band_nob(&pp->sched.flags,
1362 ~ERTS_PTS_FLG_HAVE_NS_TASKS);
1363 abort_list = pp->sched.taskq.local.busy.nosuspend;
1364 pp->sched.taskq.local.busy.nosuspend = NULL;
1365 erts_port_task_sched_unlock(&pp->sched);
1366
1367 while (abort_list) {
1368 #ifdef DEBUG
1369 ErtsPortTaskHandle *saved_pthp;
1370 #endif
1371 ErtsPortTaskType type;
1372 ErtsPortTaskTypeData td;
1373 ErtsPortTaskHandle *pthp;
1374 ErtsPortTask *ptp;
1375 ErtsPortTaskHandleList *pthlp;
1376 erts_aint32_t old_state;
1377
1378 pthlp = abort_list;
1379 abort_list = pthlp->u.next;
1380
1381 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1382 dhndl = erts_thr_progress_unmanaged_delay();
1383
1384 pthp = &pthlp->handle;
1385 ptp = handle2task(pthp);
1386 if (!ptp) {
1387 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1388 erts_thr_progress_unmanaged_continue(dhndl);
1389 schedule_port_task_handle_list_free(pthlp);
1390 continue;
1391 }
1392
1393 #ifdef DEBUG
1394 saved_pthp = ptp->u.alive.handle;
1395 ERTS_THR_READ_MEMORY_BARRIER;
1396 old_state = erts_atomic32_read_nob(&ptp->state);
1397 if (old_state == ERTS_PT_STATE_SCHEDULED) {
1398 ASSERT(saved_pthp == pthp);
1399 }
1400 #endif
1401
1402 old_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1403 ERTS_PT_STATE_ABORTED,
1404 ERTS_PT_STATE_SCHEDULED);
1405 if (old_state != ERTS_PT_STATE_SCHEDULED) {
1406 /* Task already aborted, executing, or executed */
1407 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1408 erts_thr_progress_unmanaged_continue(dhndl);
1409 schedule_port_task_handle_list_free(pthlp);
1410 continue;
1411 }
1412
1413 reset_port_task_handle(pthp);
1414
1415 type = ptp->type;
1416 td = ptp->u.alive.td;
1417
1418 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1419 erts_thr_progress_unmanaged_continue(dhndl);
1420 schedule_port_task_handle_list_free(pthlp);
1421
1422 abort_nosuspend_task(pp, type, &td, pp->sched.taskq.bpq != NULL);
1423 }
1424 }
1425
1426 /*
1427 * Schedule a task.
1428 */
1429
1430 int
erts_port_task_schedule(Eterm id,ErtsPortTaskHandle * pthp,ErtsPortTaskType type,...)1431 erts_port_task_schedule(Eterm id,
1432 ErtsPortTaskHandle *pthp,
1433 ErtsPortTaskType type,
1434 ...)
1435 {
1436 ErtsProc2PortSigData *sigdp = NULL;
1437 ErtsPortTaskHandleList *ns_pthlp = NULL;
1438 ErtsRunQueue *xrunq;
1439 ErtsThrPrgrDelayHandle dhndl;
1440 ErtsRunQueue *runq;
1441 Port *pp;
1442 ErtsPortTask *ptp = NULL;
1443 erts_aint32_t act, add_flags;
1444 unsigned int prof_runnable_ports;
1445
1446 ERTS_LC_ASSERT(!pthp || !erts_port_task_is_scheduled(pthp));
1447
1448 ASSERT(is_internal_port(id));
1449
1450 dhndl = erts_thr_progress_unmanaged_delay();
1451
1452 pp = erts_port_lookup_raw(id);
1453
1454 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED) {
1455 if (pp)
1456 erts_port_inc_refc(pp);
1457 erts_thr_progress_unmanaged_continue(dhndl);
1458 }
1459
1460 if (type != ERTS_PORT_TASK_PROC_SIG) {
1461 if (!pp)
1462 goto fail;
1463
1464 ptp = port_task_alloc();
1465
1466 ptp->type = type;
1467 ptp->u.alive.flags = 0;
1468
1469 erts_atomic32_init_nob(&ptp->state, ERTS_PT_STATE_SCHEDULED);
1470
1471 set_handle(ptp, pthp);
1472 }
1473
1474 switch (type) {
1475 case ERTS_PORT_TASK_INPUT:
1476 case ERTS_PORT_TASK_OUTPUT: {
1477 va_list argp;
1478 va_start(argp, type);
1479 ptp->u.alive.td.io.event = va_arg(argp, ErlDrvEvent);
1480 #if ERTS_POLL_USE_SCHEDULER_POLLING
1481 ptp->u.alive.td.io.is_scheduler_event = va_arg(argp, int);
1482 #endif
1483 va_end(argp);
1484 #if ERTS_POLL_USE_SCHEDULER_POLLING
1485 if (ptp->u.alive.td.io.is_scheduler_event)
1486 erts_atomic_inc_relb(&erts_port_task_outstanding_io_tasks);
1487 #endif
1488 break;
1489 }
1490 case ERTS_PORT_TASK_PROC_SIG: {
1491 va_list argp;
1492 va_start(argp, type);
1493 sigdp = va_arg(argp, ErtsProc2PortSigData *);
1494 ptp = p2p_sig_data_to_task(sigdp);
1495 ptp->u.alive.td.psig.callback = va_arg(argp, ErtsProc2PortSigCallback);
1496 ptp->u.alive.flags |= va_arg(argp, int);
1497 va_end(argp);
1498 if (!pp)
1499 goto fail;
1500
1501 if (!(ptp->u.alive.flags & ERTS_PT_FLG_NOSUSPEND))
1502 set_tmp_handle(ptp, pthp);
1503 else {
1504 ns_pthlp = erts_alloc(ERTS_ALC_T_PT_HNDL_LIST,
1505 sizeof(ErtsPortTaskHandleList));
1506 set_handle(ptp, &ns_pthlp->handle);
1507 }
1508 break;
1509 }
1510 default:
1511 break;
1512 }
1513
1514 if (!enqueue_task(pp, ptp, sigdp, ns_pthlp, &act)) {
1515 reset_handle(ptp);
1516 if (ns_pthlp && !(act & ERTS_PTS_FLG_EXIT))
1517 goto abort_nosuspend;
1518 else
1519 goto fail;
1520 }
1521
1522 add_flags = ERTS_PTS_FLG_HAVE_TASKS;
1523 if (ns_pthlp)
1524 add_flags |= ERTS_PTS_FLG_HAVE_NS_TASKS;
1525
1526 prof_runnable_ports = erts_system_profile_flags.runnable_ports;
1527 if (prof_runnable_ports)
1528 erts_port_task_sched_lock(&pp->sched);
1529
1530 while (1) {
1531 erts_aint32_t new, exp;
1532
1533 if ((act & add_flags) == add_flags
1534 && (act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1535 goto done; /* Done */
1536
1537 new = exp = act;
1538 new |= add_flags;
1539 if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1540 new |= ERTS_PTS_FLG_IN_RUNQ;
1541
1542 act = erts_atomic32_cmpxchg_relb(&pp->sched.flags, new, exp);
1543
1544 if (exp == act) {
1545 if (!(act & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1546 break; /* Need to enqueue port */
1547 goto done; /* Done */
1548 }
1549
1550 if (act & ERTS_PTS_FLG_EXIT)
1551 goto done; /* Died after our task insert... */
1552 }
1553
1554 if (prof_runnable_ports) {
1555 if (!(act & ERTS_PTS_FLG_EXEC_IMM))
1556 profile_runnable_port(pp, am_active);
1557 erts_port_task_sched_unlock(&pp->sched);
1558 prof_runnable_ports = 0;
1559 }
1560
1561 /* Enqueue port on run-queue */
1562
1563 runq = erts_port_runq(pp);
1564
1565 xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1566 ERTS_LC_ASSERT(runq != xrunq);
1567 ERTS_LC_VERIFY_RQ(runq, pp);
1568 if (xrunq) {
1569 /* Emigrate port ... */
1570 erts_set_runq_port(pp, xrunq);
1571 erts_runq_unlock(runq);
1572 runq = erts_port_runq(pp);
1573 }
1574
1575 enqueue_port(runq, pp);
1576
1577 erts_runq_unlock(runq);
1578
1579 erts_notify_inc_runq(runq);
1580
1581 done:
1582
1583 if (prof_runnable_ports)
1584 erts_port_task_sched_unlock(&pp->sched);
1585
1586 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1587 erts_port_dec_refc(pp);
1588
1589 return 0;
1590
1591 abort_nosuspend:
1592
1593 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1594 erts_port_dec_refc(pp);
1595
1596 abort_nosuspend_task(pp, ptp->type, &ptp->u.alive.td, 0);
1597
1598 ASSERT(ns_pthlp);
1599 erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1600
1601 ASSERT(ptp);
1602 port_task_free(ptp);
1603
1604 return 0;
1605
1606 fail:
1607
1608 if (dhndl != ERTS_THR_PRGR_DHANDLE_MANAGED)
1609 erts_port_dec_refc(pp);
1610
1611 if (ptp) {
1612 if (ptp->type == ERTS_PORT_TASK_PROC_SIG)
1613 abort_signal_task(pp, ERTS_PROC2PORT_SIG_ABORT,
1614 ptp->type, &ptp->u.alive.td, 0);
1615 port_task_free(ptp);
1616 }
1617
1618 if (ns_pthlp)
1619 erts_free(ERTS_ALC_T_PT_HNDL_LIST, ns_pthlp);
1620
1621 return -1;
1622 }
1623
1624 void
erts_port_task_free_port(Port * pp)1625 erts_port_task_free_port(Port *pp)
1626 {
1627 erts_aint32_t flags;
1628 ErtsRunQueue *runq;
1629
1630 ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1631 ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
1632
1633 runq = erts_port_runq(pp);
1634 erts_port_task_sched_lock(&pp->sched);
1635 flags = erts_atomic32_read_bor_relb(&pp->sched.flags,
1636 ERTS_PTS_FLG_EXIT);
1637 erts_port_task_sched_unlock(&pp->sched);
1638 erts_atomic32_read_bset_relb(&pp->state,
1639 (ERTS_PORT_SFLG_CONNECTED
1640 | ERTS_PORT_SFLG_EXITING
1641 | ERTS_PORT_SFLG_CLOSING
1642 | ERTS_PORT_SFLG_FREE),
1643 ERTS_PORT_SFLG_FREE);
1644
1645 erts_runq_unlock(runq);
1646
1647 if (!(flags & (ERTS_PTS_FLG_IN_RUNQ|ERTS_PTS_FLG_EXEC)))
1648 begin_port_cleanup(pp, NULL, NULL);
1649 }
1650
1651 /*
1652 * Execute scheduled tasks of a port.
1653 *
1654 * erts_port_task_execute() is called by scheduler threads between
1655 * scheduling of processes. Run-queue lock should be held by caller.
1656 */
1657
1658 void
erts_port_task_execute(ErtsRunQueue * runq,Port ** curr_port_pp)1659 erts_port_task_execute(ErtsRunQueue *runq, Port **curr_port_pp)
1660 {
1661 Port *pp;
1662 ErtsPortTask *execq;
1663 int processing_busy_q;
1664 int vreds = 0;
1665 int reds = 0;
1666 int fpe_was_unmasked;
1667 erts_aint32_t state;
1668 int active;
1669 Uint64 start_time = 0;
1670 ErtsSchedulerData *esdp = runq->scheduler;
1671 #if ERTS_POLL_USE_SCHEDULER_POLLING
1672 erts_aint_t io_tasks_executed = 0;
1673 #endif
1674 ERTS_MSACC_PUSH_STATE_M();
1675
1676 ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1677
1678 pp = pop_port(runq);
1679 if (!pp) {
1680 goto done;
1681 }
1682
1683 ERTS_LC_VERIFY_RQ(runq, pp);
1684
1685 erts_runq_unlock(runq);
1686
1687 *curr_port_pp = pp;
1688
1689 if (erts_sched_stat.enabled) {
1690 Uint old = ERTS_PORT_SCHED_ID(pp, esdp->no);
1691 int migrated = old && old != esdp->no;
1692
1693 erts_spin_lock(&erts_sched_stat.lock);
1694 erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_executed++;
1695 erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].executed++;
1696 if (migrated) {
1697 erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].total_migrated++;
1698 erts_sched_stat.prio[ERTS_PORT_PRIO_LEVEL].migrated++;
1699 }
1700 erts_spin_unlock(&erts_sched_stat.lock);
1701 }
1702
1703 prepare_exec(pp, &execq, &processing_busy_q);
1704
1705 erts_port_lock(pp);
1706
1707 /* trace port scheduling, in */
1708 if (IS_TRACED_FL(pp, F_TRACE_SCHED_PORTS)) {
1709 trace_sched_ports(pp, am_in);
1710 }
1711
1712 fpe_was_unmasked = erts_block_fpe();
1713
1714 state = erts_atomic32_read_nob(&pp->state);
1715 pp->reds = ERTS_PORT_REDS_EXECUTE;
1716 ERTS_MSACC_SET_STATE_CACHED_M(ERTS_MSACC_STATE_PORT);
1717 goto begin_handle_tasks;
1718
1719 while (1) {
1720 erts_aint32_t task_state;
1721 ErtsPortTask *ptp;
1722
1723 ptp = select_task_for_exec(pp, &execq, &processing_busy_q);
1724 if (!ptp)
1725 break;
1726
1727 task_state = erts_atomic32_cmpxchg_nob(&ptp->state,
1728 ERTS_PT_STATE_EXECUTING,
1729 ERTS_PT_STATE_SCHEDULED);
1730 if (task_state != ERTS_PT_STATE_SCHEDULED) {
1731 ASSERT(task_state == ERTS_PT_STATE_ABORTED);
1732 goto aborted_port_task;
1733 }
1734
1735 if (erts_system_monitor_long_schedule != 0) {
1736 start_time = erts_timestamp_millis();
1737 }
1738
1739 ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1740 ERTS_CHK_NO_PROC_LOCKS;
1741 ASSERT(pp->drv_ptr);
1742
1743 switch (ptp->type) {
1744 case ERTS_PORT_TASK_TIMEOUT:
1745 reset_handle(ptp);
1746 if (!ERTS_PTMR_IS_TIMED_OUT(pp))
1747 reds = 0;
1748 else {
1749 ERTS_PTMR_CLEAR(pp);
1750 reds = ERTS_PORT_REDS_TIMEOUT;
1751 if (!(state & ERTS_PORT_SFLGS_DEAD)) {
1752 DTRACE_DRIVER(driver_timeout, pp);
1753 LTTNG_DRIVER(driver_timeout, pp);
1754 if (IS_TRACED_FL(pp, F_TRACE_RECEIVE))
1755 trace_port(pp, am_receive, am_timeout);
1756 (*pp->drv_ptr->timeout)((ErlDrvData) pp->drv_data);
1757 }
1758 }
1759 break;
1760 case ERTS_PORT_TASK_INPUT:
1761 reds = ERTS_PORT_REDS_INPUT;
1762 ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1763 DTRACE_DRIVER(driver_ready_input, pp);
1764 LTTNG_DRIVER(driver_ready_input, pp);
1765 /* NOTE some windows drivers use ->ready_input
1766 for input and output */
1767 (*pp->drv_ptr->ready_input)((ErlDrvData) pp->drv_data,
1768 ptp->u.alive.td.io.event);
1769 reset_executed_io_task_handle(pp, ptp);
1770 #if ERTS_POLL_USE_SCHEDULER_POLLING
1771 if (ptp->u.alive.td.io.is_scheduler_event)
1772 io_tasks_executed++;
1773 #endif
1774 break;
1775 case ERTS_PORT_TASK_OUTPUT:
1776 reds = ERTS_PORT_REDS_OUTPUT;
1777 ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1778 DTRACE_DRIVER(driver_ready_output, pp);
1779 LTTNG_DRIVER(driver_ready_output, pp);
1780 (*pp->drv_ptr->ready_output)((ErlDrvData) pp->drv_data,
1781 ptp->u.alive.td.io.event);
1782 reset_executed_io_task_handle(pp, ptp);
1783 #if ERTS_POLL_USE_SCHEDULER_POLLING
1784 if (ptp->u.alive.td.io.is_scheduler_event)
1785 io_tasks_executed++;
1786 #endif
1787 break;
1788 case ERTS_PORT_TASK_PROC_SIG: {
1789 ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
1790 reset_handle(ptp);
1791 ASSERT((state & ERTS_PORT_SFLGS_DEAD) == 0);
1792 if (!pp->sched.taskq.bpq)
1793 reds = ptp->u.alive.td.psig.callback(pp,
1794 state,
1795 ERTS_PROC2PORT_SIG_EXEC,
1796 sigdp);
1797 else {
1798 ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
1799 reds = ptp->u.alive.td.psig.callback(pp,
1800 state,
1801 ERTS_PROC2PORT_SIG_EXEC,
1802 sigdp);
1803 dequeued_proc2port_data(pp, size);
1804 }
1805 break;
1806 }
1807 case ERTS_PORT_TASK_DIST_CMD:
1808 reset_handle(ptp);
1809 reds = erts_dist_command(pp, CONTEXT_REDS - pp->reds);
1810 break;
1811 default:
1812 erts_exit(ERTS_ABORT_EXIT,
1813 "Invalid port task type: %d\n",
1814 (int) ptp->type);
1815 break;
1816 }
1817
1818 reds += erts_port_driver_callback_epilogue(pp, &state);
1819
1820 if (start_time != 0) {
1821 Sint64 diff = erts_timestamp_millis() - start_time;
1822 if (diff > 0 && (Uint) diff > erts_system_monitor_long_schedule) {
1823 monitor_long_schedule_port(pp,ptp->type,(Uint) diff);
1824 }
1825 }
1826 start_time = 0;
1827
1828 aborted_port_task:
1829 schedule_port_task_free(ptp);
1830
1831 begin_handle_tasks:
1832 if (state & ERTS_PORT_SFLG_FREE) {
1833 reds += ERTS_PORT_REDS_FREE;
1834 begin_port_cleanup(pp, &execq, &processing_busy_q);
1835
1836 break;
1837 }
1838
1839 vreds += ERTS_PORT_CALLBACK_VREDS;
1840 reds += ERTS_PORT_CALLBACK_VREDS;
1841
1842 pp->reds += reds;
1843 reds = 0;
1844
1845 if (pp->reds >= CONTEXT_REDS)
1846 break;
1847 }
1848
1849 erts_unblock_fpe(fpe_was_unmasked);
1850 ERTS_MSACC_POP_STATE_M();
1851
1852 #if ERTS_POLL_USE_SCHEDULER_POLLING
1853 if (io_tasks_executed) {
1854 ASSERT(erts_atomic_read_nob(&erts_port_task_outstanding_io_tasks)
1855 >= io_tasks_executed);
1856 erts_atomic_add_relb(&erts_port_task_outstanding_io_tasks,
1857 -1*io_tasks_executed);
1858 }
1859 #endif
1860
1861 ASSERT(runq == erts_get_runq_port(pp));
1862
1863 active = finalize_exec(pp, &execq, processing_busy_q);
1864
1865 reds = pp->reds - vreds;
1866
1867 erts_port_release(pp);
1868
1869 *curr_port_pp = NULL;
1870
1871 erts_runq_lock(runq);
1872
1873 if (active) {
1874 ErtsRunQueue *xrunq;
1875
1876 ASSERT(!(erts_atomic32_read_nob(&pp->state) & ERTS_PORT_SFLGS_DEAD));
1877
1878 xrunq = erts_check_emigration_need(runq, ERTS_PORT_PRIO_LEVEL);
1879 ERTS_LC_ASSERT(runq != xrunq);
1880 ERTS_LC_VERIFY_RQ(runq, pp);
1881 if (!xrunq) {
1882 enqueue_port(runq, pp);
1883 /* No need to notify ourselves about inc in runq. */
1884 }
1885 else {
1886 /* Emigrate port... */
1887 erts_set_runq_port(pp, xrunq);
1888 erts_runq_unlock(runq);
1889
1890 xrunq = erts_port_runq(pp);
1891 enqueue_port(xrunq, pp);
1892 erts_runq_unlock(xrunq);
1893 erts_notify_inc_runq(xrunq);
1894
1895 erts_runq_lock(runq);
1896 }
1897 }
1898
1899 done:
1900
1901 runq->scheduler->reductions += reds;
1902
1903 ERTS_LC_ASSERT(erts_lc_runq_is_locked(runq));
1904 ERTS_PORT_REDUCTIONS_EXECUTED(esdp, runq, reds);
1905 }
1906
1907 static void
release_port(void * vport)1908 release_port(void *vport)
1909 {
1910 erts_port_dec_refc((Port *) vport);
1911 }
1912
1913 static void
schedule_release_port(void * vport)1914 schedule_release_port(void *vport) {
1915 Port *pp = (Port*)vport;
1916 /* This is only used when a port release was ordered from a non-scheduler */
1917 erts_schedule_thr_prgr_later_op(release_port,
1918 (void *) pp,
1919 &pp->common.u.release);
1920 }
1921
1922
1923 static void
begin_port_cleanup(Port * pp,ErtsPortTask ** execqp,int * processing_busy_q_p)1924 begin_port_cleanup(Port *pp, ErtsPortTask **execqp, int *processing_busy_q_p)
1925 {
1926 int i, max;
1927 ErtsPortTaskBusyCallerTable *tabp;
1928 ErtsPortTask *qs[3];
1929 ErtsPortTaskHandleList *free_nshp = NULL;
1930 ErtsProcList *plp;
1931
1932 ERTS_LC_ASSERT(erts_lc_is_port_locked(pp));
1933
1934 /*
1935 * Abort remaining tasks...
1936 *
1937 * We want to process queues in the following order in order
1938 * to preserve signal ordering guarantees:
1939 * 1. Local busy queue
1940 * 2. Local queue
1941 * 3. In queue
1942 */
1943
1944 max = 0;
1945 if (!execqp) {
1946 if (pp->sched.taskq.local.busy.first)
1947 qs[max++] = pp->sched.taskq.local.busy.first;
1948 if (pp->sched.taskq.local.first)
1949 qs[max++] = pp->sched.taskq.local.first;
1950 }
1951 else {
1952 if (*processing_busy_q_p) {
1953 if (*execqp)
1954 qs[max++] = *execqp;
1955 if (pp->sched.taskq.local.first)
1956 qs[max++] = pp->sched.taskq.local.first;
1957 }
1958 else {
1959 if (pp->sched.taskq.local.busy.first)
1960 qs[max++] = pp->sched.taskq.local.busy.first;
1961 if (*execqp)
1962 qs[max++] = *execqp;
1963 }
1964 *execqp = NULL;
1965 *processing_busy_q_p = 0;
1966 }
1967 pp->sched.taskq.local.busy.first = NULL;
1968 pp->sched.taskq.local.busy.last = NULL;
1969 pp->sched.taskq.local.first = NULL;
1970 tabp = pp->sched.taskq.local.busy.table;
1971 if (tabp) {
1972 int bix;
1973 for (bix = 0; bix < ERTS_PORT_TASK_BUSY_CALLER_TABLE_BUCKETS; bix++) {
1974 ErtsPortTaskBusyCaller *bcp = tabp->bucket[bix];
1975 while (bcp) {
1976 ErtsPortTaskBusyCaller *free_bcp = bcp;
1977 bcp = bcp->next;
1978 if (free_bcp != &tabp->pre_alloc_busy_caller)
1979 erts_free(ERTS_ALC_T_BUSY_CALLER, free_bcp);
1980 }
1981 }
1982
1983 busy_caller_table_free(tabp);
1984 pp->sched.taskq.local.busy.table = NULL;
1985 }
1986
1987 erts_port_task_sched_lock(&pp->sched);
1988 qs[max] = pp->sched.taskq.in.first;
1989 pp->sched.taskq.in.first = NULL;
1990 pp->sched.taskq.in.last = NULL;
1991 erts_port_task_sched_unlock(&pp->sched);
1992 if (qs[max])
1993 max++;
1994
1995 for (i = 0; i < max; i++) {
1996 while (1) {
1997 erts_aint32_t state;
1998 ErtsPortTask *ptp = qs[i];
1999 if (!ptp)
2000 break;
2001
2002 qs[i] = ptp->u.alive.next;
2003
2004 /* Normal case here is aborted tasks... */
2005 state = erts_atomic32_read_nob(&ptp->state);
2006 if (state == ERTS_PT_STATE_ABORTED)
2007 goto aborted_port_task;
2008
2009 state = erts_atomic32_cmpxchg_nob(&ptp->state,
2010 ERTS_PT_STATE_EXECUTING,
2011 ERTS_PT_STATE_SCHEDULED);
2012 if (state != ERTS_PT_STATE_SCHEDULED) {
2013 ASSERT(state == ERTS_PT_STATE_ABORTED);
2014 goto aborted_port_task;
2015 }
2016
2017 reset_handle(ptp);
2018
2019 switch (ptp->type) {
2020 case ERTS_PORT_TASK_TIMEOUT:
2021 break;
2022 case ERTS_PORT_TASK_INPUT:
2023 erts_stale_drv_select(pp->common.id,
2024 ERTS_Port2ErlDrvPort(pp),
2025 ptp->u.alive.td.io.event,
2026 DO_READ,
2027 1);
2028 break;
2029 case ERTS_PORT_TASK_OUTPUT:
2030 erts_stale_drv_select(pp->common.id,
2031 ERTS_Port2ErlDrvPort(pp),
2032 ptp->u.alive.td.io.event,
2033 DO_WRITE,
2034 1);
2035 break;
2036 case ERTS_PORT_TASK_DIST_CMD:
2037 break;
2038 case ERTS_PORT_TASK_PROC_SIG: {
2039 ErtsProc2PortSigData *sigdp = &ptp->u.alive.td.psig.data;
2040 if (!pp->sched.taskq.bpq)
2041 ptp->u.alive.td.psig.callback(NULL,
2042 ERTS_PORT_SFLG_INVALID,
2043 ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2044 sigdp);
2045 else {
2046 ErlDrvSizeT size = erts_proc2port_sig_command_data_size(sigdp);
2047 ptp->u.alive.td.psig.callback(NULL,
2048 ERTS_PORT_SFLG_INVALID,
2049 ERTS_PROC2PORT_SIG_ABORT_CLOSED,
2050 sigdp);
2051 aborted_proc2port_data(pp, size);
2052 }
2053 break;
2054 }
2055 default:
2056 erts_exit(ERTS_ABORT_EXIT,
2057 "Invalid port task type: %d\n",
2058 (int) ptp->type);
2059 }
2060
2061 aborted_port_task:
2062 schedule_port_task_free(ptp);
2063 }
2064 }
2065
2066 erts_atomic32_read_band_nob(&pp->sched.flags,
2067 ~(ERTS_PTS_FLG_HAVE_BUSY_TASKS
2068 |ERTS_PTS_FLG_HAVE_TASKS
2069 |ERTS_PTS_FLGS_BUSY));
2070
2071 erts_port_task_sched_lock(&pp->sched);
2072
2073 /* Cleanup nosuspend handles... */
2074 free_nshp = (pp->sched.taskq.local.busy.nosuspend
2075 ? get_free_nosuspend_handles(pp)
2076 : NULL);
2077 ASSERT(!pp->sched.taskq.local.busy.nosuspend);
2078
2079 /* Make sure not to leave any processes suspended on the port... */
2080 plp = pp->suspended;
2081 pp->suspended = NULL;
2082
2083 erts_port_task_sched_unlock(&pp->sched);
2084
2085 if (free_nshp)
2086 free_nosuspend_handles(free_nshp);
2087
2088 if (erts_proclist_fetch(&plp, NULL)) {
2089 #ifdef USE_VM_PROBES
2090 if (DTRACE_ENABLED(process_port_unblocked)) {
2091 DTRACE_CHARBUF(port_str, 16);
2092 DTRACE_CHARBUF(pid_str, 16);
2093 ErtsProcList* plp2 = plp;
2094
2095 erts_snprintf(port_str, sizeof(DTRACE_CHARBUF_NAME(port_str)), "%T", pp->common.id);
2096 while (plp2 != NULL) {
2097 erts_snprintf(pid_str, sizeof(DTRACE_CHARBUF_NAME(pid_str)), "%T", plp2->u.pid);
2098 DTRACE2(process_port_unblocked, pid_str, port_str);
2099 }
2100 }
2101 #endif
2102 erts_resume_processes(plp);
2103 }
2104
2105 /*
2106 * Schedule cleanup of port structure...
2107 */
2108 /* We might not be a scheduler, eg. traceing to port we are sys_msg_dispatcher */
2109 if (!erts_get_scheduler_data()) {
2110 erts_schedule_misc_aux_work(1, schedule_release_port, (void*)pp);
2111 } else {
2112 /* Has to be more or less immediate to release any driver */
2113 erts_schedule_thr_prgr_later_op(release_port,
2114 (void *) pp,
2115 &pp->common.u.release);
2116 }
2117 }
2118
2119
2120 void
erts_enqueue_port(ErtsRunQueue * rq,Port * pp)2121 erts_enqueue_port(ErtsRunQueue *rq, Port *pp)
2122 {
2123 ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2124 ASSERT(rq == erts_get_runq_port(pp));
2125 ASSERT(erts_atomic32_read_nob(&pp->sched.flags) & ERTS_PTS_FLG_IN_RUNQ);
2126 enqueue_port(rq, pp);
2127 }
2128
2129 Port *
erts_dequeue_port(ErtsRunQueue * rq)2130 erts_dequeue_port(ErtsRunQueue *rq)
2131 {
2132 Port *pp;
2133 ERTS_LC_ASSERT(erts_lc_runq_is_locked(rq));
2134 pp = pop_port(rq);
2135 ASSERT(!pp || rq == erts_get_runq_port(pp));
2136 ASSERT(!pp || (erts_atomic32_read_nob(&pp->sched.flags)
2137 & ERTS_PTS_FLG_IN_RUNQ));
2138 return pp;
2139 }
2140
2141
2142 /*
2143 * Initialize the module.
2144 */
2145 void
erts_port_task_init(void)2146 erts_port_task_init(void)
2147 {
2148 #if ERTS_POLL_USE_SCHEDULER_POLLING
2149 erts_atomic_init_nob(&erts_port_task_outstanding_io_tasks,
2150 (erts_aint_t) 0);
2151 #endif
2152 init_port_task_alloc(erts_no_schedulers + erts_no_poll_threads
2153 + 1); /* aux_thread */
2154 init_busy_caller_table_alloc();
2155 }
2156