1 /*
2 * %CopyrightBegin%
3 *
4 * Copyright Ericsson AB 2006-2020. All Rights Reserved.
5 *
6 * Licensed under the Apache License, Version 2.0 (the "License");
7 * you may not use this file except in compliance with the License.
8 * You may obtain a copy of the License at
9 *
10 * http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing, software
13 * distributed under the License is distributed on an "AS IS" BASIS,
14 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15 * See the License for the specific language governing permissions and
16 * limitations under the License.
17 *
18 * %CopyrightEnd%
19 */
20
21 /*
22 * Description: Scheduling of port tasks
23 *
24 * Author: Rickard Green
25 */
26
27 #ifndef ERTS_PORT_TASK_H_BASIC_TYPES__
28 #define ERTS_PORT_TASK_H_BASIC_TYPES__
29 #include "erl_sys_driver.h"
30 #include "erl_threads.h"
31 #define ERL_PORT_GET_PORT_TYPE_ONLY__
32 #include "erl_port.h"
33 #undef ERL_PORT_GET_PORT_TYPE_ONLY__
34 typedef erts_atomic_t ErtsPortTaskHandle;
35 #endif
36
37 #ifndef ERTS_PORT_TASK_ONLY_BASIC_TYPES__
38 #ifndef ERL_PORT_TASK_H__
39 #define ERL_PORT_TASK_H__
40
41 #include "erl_poll.h"
42
43 #undef ERTS_INCLUDE_SCHEDULER_INTERNALS
44 #if (defined(ERL_PROCESS_C__) \
45 || defined(ERL_PORT_TASK_C__) \
46 || defined(ERL_IO_C__) \
47 || (ERTS_GLB_INLINE_INCL_FUNC_DEF \
48 && defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF)))
49 #define ERTS_INCLUDE_SCHEDULER_INTERNALS
50 #endif
51
52 #define ERTS_PT_FLG_WAIT_BUSY (1 << 0)
53 #define ERTS_PT_FLG_SIG_DEP (1 << 1)
54 #define ERTS_PT_FLG_NOSUSPEND (1 << 2)
55 #define ERTS_PT_FLG_REF (1 << 3)
56 #define ERTS_PT_FLG_BAD_OUTPUT (1 << 4)
57
58 typedef enum {
59 ERTS_PORT_TASK_INPUT = 0,
60 ERTS_PORT_TASK_OUTPUT = 1,
61 ERTS_PORT_TASK_TIMEOUT,
62 ERTS_PORT_TASK_DIST_CMD,
63 ERTS_PORT_TASK_PROC_SIG
64 } ErtsPortTaskType;
65
66 #define ERTS_PTS_FLG_IN_RUNQ (((erts_aint32_t) 1) << 0)
67 #define ERTS_PTS_FLG_EXEC (((erts_aint32_t) 1) << 1)
68 #define ERTS_PTS_FLG_HAVE_TASKS (((erts_aint32_t) 1) << 2)
69 #define ERTS_PTS_FLG_EXIT (((erts_aint32_t) 1) << 3)
70 #define ERTS_PTS_FLG_BUSY_PORT (((erts_aint32_t) 1) << 4)
71 #define ERTS_PTS_FLG_BUSY_PORT_Q (((erts_aint32_t) 1) << 5)
72 #define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q (((erts_aint32_t) 1) << 6)
73 #define ERTS_PTS_FLG_HAVE_BUSY_TASKS (((erts_aint32_t) 1) << 7)
74 #define ERTS_PTS_FLG_HAVE_NS_TASKS (((erts_aint32_t) 1) << 8)
75 #define ERTS_PTS_FLG_PARALLELISM (((erts_aint32_t) 1) << 9)
76 #define ERTS_PTS_FLG_FORCE_SCHED (((erts_aint32_t) 1) << 10)
77 #define ERTS_PTS_FLG_EXITING (((erts_aint32_t) 1) << 11)
78 #define ERTS_PTS_FLG_EXEC_IMM (((erts_aint32_t) 1) << 12)
79
80 #define ERTS_PTS_FLGS_BUSY \
81 (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q)
82
83 #define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP \
84 (ERTS_PTS_FLG_EXIT \
85 | ERTS_PTS_FLG_HAVE_BUSY_TASKS \
86 | ERTS_PTS_FLG_HAVE_TASKS \
87 | ERTS_PTS_FLG_EXEC \
88 | ERTS_PTS_FLG_EXEC_IMM \
89 | ERTS_PTS_FLG_FORCE_SCHED \
90 | ERTS_PTS_FLG_EXITING)
91
92 #define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH 8192
93 #define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW 4096
94
95 typedef struct {
96 ErlDrvSizeT high;
97 erts_atomic_t low;
98 erts_atomic_t size;
99 } ErtsPortTaskBusyPortQ;
100
101 typedef struct ErtsPortTask_ ErtsPortTask;
102 typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable;
103 typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList;
104
105 typedef struct {
106 Port *next;
107 struct {
108 struct {
109 struct {
110 ErtsPortTask *first;
111 ErtsPortTask *last;
112 ErtsPortTaskBusyCallerTable *table;
113 ErtsPortTaskHandleList *nosuspend;
114 } busy;
115 ErtsPortTask *first;
116 } local;
117 struct {
118 ErtsPortTask *first;
119 ErtsPortTask *last;
120 } in;
121 ErtsPortTaskBusyPortQ *bpq;
122 } taskq;
123 erts_atomic32_t flags;
124 erts_mtx_t mtx;
125 } ErtsPortTaskSched;
126
127 ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
128 ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
129 ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
130 ErtsPortTaskBusyPortQ *bpq);
131 ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
132 Eterm id);
133 ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
134 ERTS_GLB_INLINE void erts_port_task_sched_lock(ErtsPortTaskSched *ptsp);
135 ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
136 ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp);
137 ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp);
138
139 #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
140 ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
141 /* NOTE: Do not access any of the exported variables directly */
142 extern erts_atomic_t erts_port_task_outstanding_io_tasks;
143 #endif
144
145 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
146
147 ERTS_GLB_INLINE void
erts_port_task_handle_init(ErtsPortTaskHandle * pthp)148 erts_port_task_handle_init(ErtsPortTaskHandle *pthp)
149 {
150 erts_atomic_init_nob(pthp, (erts_aint_t) NULL);
151 }
152
153 ERTS_GLB_INLINE int
erts_port_task_is_scheduled(ErtsPortTaskHandle * pthp)154 erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
155 {
156 return ((void *) erts_atomic_read_acqb(pthp)) != NULL;
157 }
158
erts_port_task_pre_init_sched(ErtsPortTaskSched * ptsp,ErtsPortTaskBusyPortQ * bpq)159 ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
160 ErtsPortTaskBusyPortQ *bpq)
161 {
162 if (bpq) {
163 erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW;
164 erts_atomic_init_nob(&bpq->low, low);
165 bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH;
166 erts_atomic_init_nob(&bpq->size, (erts_aint_t) 0);
167 }
168 ptsp->taskq.bpq = bpq;
169 }
170
171 ERTS_GLB_INLINE void
erts_port_task_init_sched(ErtsPortTaskSched * ptsp,Eterm instr_id)172 erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
173 {
174 char *lock_str = "port_sched_lock";
175 ptsp->next = NULL;
176 ptsp->taskq.local.busy.first = NULL;
177 ptsp->taskq.local.busy.last = NULL;
178 ptsp->taskq.local.busy.table = NULL;
179 ptsp->taskq.local.busy.nosuspend = NULL;
180 ptsp->taskq.local.first = NULL;
181 ptsp->taskq.in.first = NULL;
182 ptsp->taskq.in.last = NULL;
183 erts_atomic32_init_nob(&ptsp->flags, 0);
184 erts_mtx_init(&ptsp->mtx, lock_str, instr_id, ERTS_LOCK_FLAGS_CATEGORY_IO);
185 }
186
187 ERTS_GLB_INLINE void
erts_port_task_sched_lock(ErtsPortTaskSched * ptsp)188 erts_port_task_sched_lock(ErtsPortTaskSched *ptsp)
189 {
190 erts_mtx_lock(&ptsp->mtx);
191 }
192
193 ERTS_GLB_INLINE void
erts_port_task_sched_unlock(ErtsPortTaskSched * ptsp)194 erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp)
195 {
196 erts_mtx_unlock(&ptsp->mtx);
197 }
198
199 ERTS_GLB_INLINE int
erts_port_task_sched_lock_is_locked(ErtsPortTaskSched * ptsp)200 erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp)
201 {
202 #if defined(ERTS_ENABLE_LOCK_CHECK)
203 return erts_lc_mtx_is_locked(&ptsp->mtx);
204 #else
205 return 0;
206 #endif
207 }
208
209
210 ERTS_GLB_INLINE void
erts_port_task_fini_sched(ErtsPortTaskSched * ptsp)211 erts_port_task_fini_sched(ErtsPortTaskSched *ptsp)
212 {
213 erts_mtx_destroy(&ptsp->mtx);
214 }
215
216 ERTS_GLB_INLINE void
erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched * ptsp)217 erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp)
218 {
219 erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING);
220 }
221
222 #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
223 ERTS_GLB_INLINE int
erts_port_task_have_outstanding_io_tasks(void)224 erts_port_task_have_outstanding_io_tasks(void)
225 {
226 return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
227 != 0);
228 }
229 #endif
230
231 #endif
232
233 #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
234 void erts_port_task_execute(ErtsRunQueue *, Port **);
235 void erts_port_task_init(void);
236 #endif
237
238 /* generated for 'port_task' quick allocator */
239 void erts_port_task_pre_alloc_init_thread(void);
240
241 void erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *);
242 int erts_port_task_abort(ErtsPortTaskHandle *);
243
244 void erts_port_task_abort_nosuspend_tasks(Port *);
245
246 int erts_port_task_schedule(Eterm,
247 ErtsPortTaskHandle *,
248 ErtsPortTaskType,
249 ...);
250 void erts_port_task_free_port(Port *);
251 ErtsProc2PortSigData *erts_port_task_alloc_p2p_sig_data(void);
252 ErtsProc2PortSigData *erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr);
253 void erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp);
254
255 void erts_enqueue_port(ErtsRunQueue *rq, Port *pp);
256 Port *erts_dequeue_port(ErtsRunQueue *rq);
257 #undef ERTS_INCLUDE_SCHEDULER_INTERNALS
258 #endif /* ERL_PORT_TASK_H__ */
259 #endif /* ERTS_PORT_TASK_ONLY_BASIC_TYPES__ */
260