1 /*
2  * %CopyrightBegin%
3  *
4  * Copyright Ericsson AB 2006-2020. All Rights Reserved.
5  *
6  * Licensed under the Apache License, Version 2.0 (the "License");
7  * you may not use this file except in compliance with the License.
8  * You may obtain a copy of the License at
9  *
10  *     http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing, software
13  * distributed under the License is distributed on an "AS IS" BASIS,
14  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15  * See the License for the specific language governing permissions and
16  * limitations under the License.
17  *
18  * %CopyrightEnd%
19  */
20 
21 /*
22  * Description:	Scheduling of port tasks
23  *
24  * Author: 	Rickard Green
25  */
26 
27 #ifndef ERTS_PORT_TASK_H_BASIC_TYPES__
28 #define ERTS_PORT_TASK_H_BASIC_TYPES__
29 #include "erl_sys_driver.h"
30 #include "erl_threads.h"
31 #define ERL_PORT_GET_PORT_TYPE_ONLY__
32 #include "erl_port.h"
33 #undef ERL_PORT_GET_PORT_TYPE_ONLY__
34 typedef erts_atomic_t ErtsPortTaskHandle;
35 #endif
36 
37 #ifndef ERTS_PORT_TASK_ONLY_BASIC_TYPES__
38 #ifndef ERL_PORT_TASK_H__
39 #define ERL_PORT_TASK_H__
40 
41 #include "erl_poll.h"
42 
43 #undef ERTS_INCLUDE_SCHEDULER_INTERNALS
44 #if (defined(ERL_PROCESS_C__) \
45      || defined(ERL_PORT_TASK_C__) \
46      || defined(ERL_IO_C__) \
47      || (ERTS_GLB_INLINE_INCL_FUNC_DEF \
48 	 && defined(ERTS_DO_INCL_GLB_INLINE_FUNC_DEF)))
49 #define ERTS_INCLUDE_SCHEDULER_INTERNALS
50 #endif
51 
52 #define ERTS_PT_FLG_WAIT_BUSY		(1 << 0)
53 #define ERTS_PT_FLG_SIG_DEP		(1 << 1)
54 #define ERTS_PT_FLG_NOSUSPEND		(1 << 2)
55 #define ERTS_PT_FLG_REF			(1 << 3)
56 #define ERTS_PT_FLG_BAD_OUTPUT		(1 << 4)
57 
58 typedef enum {
59     ERTS_PORT_TASK_INPUT = 0,
60     ERTS_PORT_TASK_OUTPUT = 1,
61     ERTS_PORT_TASK_TIMEOUT,
62     ERTS_PORT_TASK_DIST_CMD,
63     ERTS_PORT_TASK_PROC_SIG
64 } ErtsPortTaskType;
65 
66 #define ERTS_PTS_FLG_IN_RUNQ			(((erts_aint32_t) 1) <<  0)
67 #define ERTS_PTS_FLG_EXEC			(((erts_aint32_t) 1) <<  1)
68 #define ERTS_PTS_FLG_HAVE_TASKS			(((erts_aint32_t) 1) <<  2)
69 #define ERTS_PTS_FLG_EXIT			(((erts_aint32_t) 1) <<  3)
70 #define ERTS_PTS_FLG_BUSY_PORT			(((erts_aint32_t) 1) <<  4)
71 #define ERTS_PTS_FLG_BUSY_PORT_Q		(((erts_aint32_t) 1) <<  5)
72 #define ERTS_PTS_FLG_CHK_UNSET_BUSY_PORT_Q	(((erts_aint32_t) 1) <<  6)
73 #define ERTS_PTS_FLG_HAVE_BUSY_TASKS		(((erts_aint32_t) 1) <<  7)
74 #define ERTS_PTS_FLG_HAVE_NS_TASKS		(((erts_aint32_t) 1) <<  8)
75 #define ERTS_PTS_FLG_PARALLELISM		(((erts_aint32_t) 1) <<  9)
76 #define ERTS_PTS_FLG_FORCE_SCHED		(((erts_aint32_t) 1) << 10)
77 #define ERTS_PTS_FLG_EXITING			(((erts_aint32_t) 1) << 11)
78 #define ERTS_PTS_FLG_EXEC_IMM			(((erts_aint32_t) 1) << 12)
79 
80 #define ERTS_PTS_FLGS_BUSY \
81     (ERTS_PTS_FLG_BUSY_PORT | ERTS_PTS_FLG_BUSY_PORT_Q)
82 
83 #define ERTS_PTS_FLGS_FORCE_SCHEDULE_OP		\
84     (ERTS_PTS_FLG_EXIT				\
85      | ERTS_PTS_FLG_HAVE_BUSY_TASKS		\
86      | ERTS_PTS_FLG_HAVE_TASKS			\
87      | ERTS_PTS_FLG_EXEC			\
88      | ERTS_PTS_FLG_EXEC_IMM			\
89      | ERTS_PTS_FLG_FORCE_SCHED			\
90      | ERTS_PTS_FLG_EXITING)
91 
92 #define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH			8192
93 #define ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW			4096
94 
95 typedef struct {
96     ErlDrvSizeT high;
97     erts_atomic_t low;
98     erts_atomic_t size;
99 }  ErtsPortTaskBusyPortQ;
100 
101 typedef struct ErtsPortTask_ ErtsPortTask;
102 typedef struct ErtsPortTaskBusyCallerTable_ ErtsPortTaskBusyCallerTable;
103 typedef struct ErtsPortTaskHandleList_ ErtsPortTaskHandleList;
104 
105 typedef struct {
106     Port *next;
107     struct {
108 	struct {
109 	    struct {
110 		ErtsPortTask *first;
111 		ErtsPortTask *last;
112 		ErtsPortTaskBusyCallerTable *table;
113 		ErtsPortTaskHandleList *nosuspend;
114 	    } busy;
115 	    ErtsPortTask *first;
116 	} local;
117 	struct {
118 	    ErtsPortTask *first;
119 	    ErtsPortTask *last;
120 	} in;
121 	ErtsPortTaskBusyPortQ *bpq;
122     } taskq;
123     erts_atomic32_t flags;
124     erts_mtx_t mtx;
125 } ErtsPortTaskSched;
126 
127 ERTS_GLB_INLINE void erts_port_task_handle_init(ErtsPortTaskHandle *pthp);
128 ERTS_GLB_INLINE int erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp);
129 ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
130 						   ErtsPortTaskBusyPortQ *bpq);
131 ERTS_GLB_INLINE void erts_port_task_init_sched(ErtsPortTaskSched *ptsp,
132 					       Eterm id);
133 ERTS_GLB_INLINE void erts_port_task_fini_sched(ErtsPortTaskSched *ptsp);
134 ERTS_GLB_INLINE void erts_port_task_sched_lock(ErtsPortTaskSched *ptsp);
135 ERTS_GLB_INLINE void erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp);
136 ERTS_GLB_INLINE int erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp);
137 ERTS_GLB_INLINE void erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp);
138 
139 #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
140 ERTS_GLB_INLINE int erts_port_task_have_outstanding_io_tasks(void);
141 /* NOTE: Do not access any of the exported variables directly */
142 extern erts_atomic_t erts_port_task_outstanding_io_tasks;
143 #endif
144 
145 #if ERTS_GLB_INLINE_INCL_FUNC_DEF
146 
147 ERTS_GLB_INLINE void
erts_port_task_handle_init(ErtsPortTaskHandle * pthp)148 erts_port_task_handle_init(ErtsPortTaskHandle *pthp)
149 {
150     erts_atomic_init_nob(pthp, (erts_aint_t) NULL);
151 }
152 
153 ERTS_GLB_INLINE int
erts_port_task_is_scheduled(ErtsPortTaskHandle * pthp)154 erts_port_task_is_scheduled(ErtsPortTaskHandle *pthp)
155 {
156     return ((void *) erts_atomic_read_acqb(pthp)) != NULL;
157 }
158 
erts_port_task_pre_init_sched(ErtsPortTaskSched * ptsp,ErtsPortTaskBusyPortQ * bpq)159 ERTS_GLB_INLINE void erts_port_task_pre_init_sched(ErtsPortTaskSched *ptsp,
160 						   ErtsPortTaskBusyPortQ *bpq)
161 {
162     if (bpq) {
163 	erts_aint_t low = (erts_aint_t) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_LOW;
164 	erts_atomic_init_nob(&bpq->low, low);
165  	bpq->high = (ErlDrvSizeT) ERTS_PORT_TASK_DEFAULT_BUSY_PORT_Q_HIGH;
166 	erts_atomic_init_nob(&bpq->size, (erts_aint_t) 0);
167     }
168     ptsp->taskq.bpq = bpq;
169 }
170 
171 ERTS_GLB_INLINE void
erts_port_task_init_sched(ErtsPortTaskSched * ptsp,Eterm instr_id)172 erts_port_task_init_sched(ErtsPortTaskSched *ptsp, Eterm instr_id)
173 {
174     char *lock_str = "port_sched_lock";
175     ptsp->next = NULL;
176     ptsp->taskq.local.busy.first = NULL;
177     ptsp->taskq.local.busy.last = NULL;
178     ptsp->taskq.local.busy.table = NULL;
179     ptsp->taskq.local.busy.nosuspend = NULL;
180     ptsp->taskq.local.first = NULL;
181     ptsp->taskq.in.first = NULL;
182     ptsp->taskq.in.last = NULL;
183     erts_atomic32_init_nob(&ptsp->flags, 0);
184     erts_mtx_init(&ptsp->mtx, lock_str, instr_id, ERTS_LOCK_FLAGS_CATEGORY_IO);
185 }
186 
187 ERTS_GLB_INLINE void
erts_port_task_sched_lock(ErtsPortTaskSched * ptsp)188 erts_port_task_sched_lock(ErtsPortTaskSched *ptsp)
189 {
190     erts_mtx_lock(&ptsp->mtx);
191 }
192 
193 ERTS_GLB_INLINE void
erts_port_task_sched_unlock(ErtsPortTaskSched * ptsp)194 erts_port_task_sched_unlock(ErtsPortTaskSched *ptsp)
195 {
196     erts_mtx_unlock(&ptsp->mtx);
197 }
198 
199 ERTS_GLB_INLINE int
erts_port_task_sched_lock_is_locked(ErtsPortTaskSched * ptsp)200 erts_port_task_sched_lock_is_locked(ErtsPortTaskSched *ptsp)
201 {
202 #if defined(ERTS_ENABLE_LOCK_CHECK)
203     return erts_lc_mtx_is_locked(&ptsp->mtx);
204 #else
205     return 0;
206 #endif
207 }
208 
209 
210 ERTS_GLB_INLINE void
erts_port_task_fini_sched(ErtsPortTaskSched * ptsp)211 erts_port_task_fini_sched(ErtsPortTaskSched *ptsp)
212 {
213     erts_mtx_destroy(&ptsp->mtx);
214 }
215 
216 ERTS_GLB_INLINE void
erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched * ptsp)217 erts_port_task_sched_enter_exiting_state(ErtsPortTaskSched *ptsp)
218 {
219     erts_atomic32_read_bor_nob(&ptsp->flags, ERTS_PTS_FLG_EXITING);
220 }
221 
222 #if defined(ERTS_INCLUDE_SCHEDULER_INTERNALS) && ERTS_POLL_USE_SCHEDULER_POLLING
223 ERTS_GLB_INLINE int
erts_port_task_have_outstanding_io_tasks(void)224 erts_port_task_have_outstanding_io_tasks(void)
225 {
226     return (erts_atomic_read_acqb(&erts_port_task_outstanding_io_tasks)
227 	    != 0);
228 }
229 #endif
230 
231 #endif
232 
233 #ifdef ERTS_INCLUDE_SCHEDULER_INTERNALS
234 void erts_port_task_execute(ErtsRunQueue *, Port **);
235 void erts_port_task_init(void);
236 #endif
237 
238 /* generated for 'port_task' quick allocator */
239 void erts_port_task_pre_alloc_init_thread(void);
240 
241 void erts_port_task_tmp_handle_detach(ErtsPortTaskHandle *);
242 int erts_port_task_abort(ErtsPortTaskHandle *);
243 
244 void erts_port_task_abort_nosuspend_tasks(Port *);
245 
246 int erts_port_task_schedule(Eterm,
247 			    ErtsPortTaskHandle *,
248 			    ErtsPortTaskType,
249 			    ...);
250 void erts_port_task_free_port(Port *);
251 ErtsProc2PortSigData *erts_port_task_alloc_p2p_sig_data(void);
252 ErtsProc2PortSigData *erts_port_task_alloc_p2p_sig_data_extra(size_t extra, void **extra_ptr);
253 void erts_port_task_free_p2p_sig_data(ErtsProc2PortSigData *sigdp);
254 
255 void erts_enqueue_port(ErtsRunQueue *rq, Port *pp);
256 Port *erts_dequeue_port(ErtsRunQueue *rq);
257 #undef ERTS_INCLUDE_SCHEDULER_INTERNALS
258 #endif /* ERL_PORT_TASK_H__ */
259 #endif /* ERTS_PORT_TASK_ONLY_BASIC_TYPES__ */
260