1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2
3 /* Monkey HTTP Server (Duda I/O)
4 * -----------------------------
5 * Copyright 2017 Eduardo Silva <eduardo@monkey.io>
6 * Copyright 2014, Zeying Xie <swpdtz at gmail dot com>
7 *
8 * Licensed under the Apache License, Version 2.0 (the "License");
9 * you may not use this file except in compliance with the License.
10 * You may obtain a copy of the License at
11 *
12 * http://www.apache.org/licenses/LICENSE-2.0
13 *
14 * Unless required by applicable law or agreed to in writing, software
15 * distributed under the License is distributed on an "AS IS" BASIS,
16 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17 * See the License for the specific language governing permissions and
18 * limitations under the License.
19 */
20
21 #include <assert.h>
22 #include <string.h>
23
24 #if defined (__APPLE__)
25 #include <sys/ucontext.h>
26 #else
27 #include <ucontext.h>
28 #endif
29
30 #include <limits.h>
31
32 #include <mk_core/mk_pthread.h>
33 #include <mk_core/mk_memory.h>
34 #include <mk_core/mk_thread.h>
35
36 /*
37 * @OBJ_NAME: dthread
38 * @OBJ_MENU: Dthread
39 * @OBJ_DESC: The dthread object provides a set of methods to handle user space cooperative thread, namely dthread(duda thread).
40 * A dthread can be suspended when it encounters something that will block(in other
41 * words, something will be available in the future), while another dthread that
42 * is ready to run is awakened. Back and forth, all dthreads within the same pthread
43 * work collaboratively. This means dthread is non-preemptive and requires the user
44 * to explicitly give up control when necessary.
45 * Dthreads communicate with each other by using channel, a channel is like a pipe,
46 * one dthread feeds data to the channel while another cosumes from it.
47 *
48 */
49
50 #ifdef USE_VALGRIND
51 #include <valgrind/valgrind.h>
52 #endif
53
54 #define MK_THREAD_STACK_SIZE (3 * (PTHREAD_STACK_MIN) / 2)
55 #define DEFAULT_MK_THREAD_NUM 16
56
57 struct mk_thread {
58 mk_thread_func func;
59 void *data;
60 ucontext_t context;
61 struct mk_thread_scheduler *sch;
62 int status;
63 int parent_id;
64 #ifdef USE_VALGRIND
65 unsigned int valgrind_stack_id;
66 #endif
67 struct mk_list chan_list;
68 char stack[MK_THREAD_STACK_SIZE];
69 } mk_thread_t;
70
71 struct mk_thread_scheduler {
72 ucontext_t main;
73 int n_dthread;
74 int cap;
75 int running_id;
76 struct mk_thread **dt;
77 };
78
79 static void _mk_thread_release(struct mk_thread *dt);
80
_mk_thread_entry_point(struct mk_thread_scheduler * sch)81 static void _mk_thread_entry_point(struct mk_thread_scheduler *sch)
82 {
83 int id;
84 struct mk_thread *dt;
85 struct mk_list *head;
86 struct mk_thread_channel *chan;
87
88 assert(sch);
89 id = sch->running_id;
90 dt = sch->dt[id];
91 dt->func(dt->data);
92 dt->status = MK_THREAD_DEAD;
93
94 mk_list_foreach(head, &dt->chan_list) {
95 chan = mk_list_entry(head, struct mk_thread_channel, _head);
96 chan->receiver = -1;
97 }
98 sch->n_dthread--;
99 sch->running_id = dt->parent_id;
100 }
101
mk_thread_open()102 struct mk_thread_scheduler *mk_thread_open()
103 {
104 struct mk_thread_scheduler *sch;
105
106 sch = mk_mem_alloc(sizeof(*sch));
107 if (!sch) {
108 return NULL;
109 }
110
111 sch->n_dthread = 0;
112 sch->cap = DEFAULT_MK_THREAD_NUM;
113 sch->running_id = -1;
114 sch->dt = mk_mem_alloc_z(sizeof(struct mk_thread *) * sch->cap);
115 if (!sch->dt) {
116 mk_mem_free(sch);
117 return NULL;
118 }
119
120 return sch;
121 }
122
mk_thread_close(struct mk_thread_scheduler * sch)123 void mk_thread_close(struct mk_thread_scheduler *sch)
124 {
125 struct mk_thread *dt;
126
127 int i;
128 for (i = 0; i < sch->cap; ++i) {
129 dt = sch->dt[i];
130 if (dt) {
131 _mk_thread_release(dt);
132 }
133 }
134 mk_mem_free(sch->dt);
135 sch->dt = NULL;
136 mk_mem_free(sch);
137 }
138
139 /*
140 * @METHOD_NAME: create
141 * @METHOD_DESC: create a new dthread.
142 * @METHOD_PROTO: int create(mk_thread_func func, void *data)
143 * @METHOD_PARAM: func the function to be executed when the newly created dthread
144 * is started.
145 * @METHOD_PARAM: data user specific data that will be passed to func.
146 * @METHOD_RETURN: the dthread id associated with the new dthread.
147 */
mk_thread_create(mk_thread_func func,void * data)148 int mk_thread_create(mk_thread_func func, void *data)
149 {
150 int i;
151 int id;
152 void *p;
153 struct mk_thread_scheduler *sch;
154 struct mk_thread *dt;
155
156 sch = pthread_getspecific(mk_thread_scheduler);
157 if (!sch) {
158 sch = mk_thread_open();
159 assert(sch);
160 pthread_setspecific(mk_thread_scheduler, (void *) sch);
161 }
162
163 if (sch->n_dthread >= sch->cap) {
164 id = sch->cap;
165
166 p = mk_mem_realloc(sch->dt, sch->cap * 2 * sizeof(struct mk_thread *));
167 if (!p) {
168 return -1;
169 }
170 sch->dt = p;
171 memset(sch->dt + sch->cap, 0, sizeof(struct mk_thread *) * sch->cap);
172 sch->cap *= 2;
173 }
174 else {
175 for (i = 0; i < sch->cap; ++i) {
176 id = (i + sch->cap) % sch->cap;
177 if (sch->dt[id] == NULL || sch->dt[id]->status == MK_THREAD_DEAD) {
178 break;
179 }
180 }
181 }
182
183 /* may use dthread pooling instead of release and realloc */
184 if (sch->dt[id] && sch->dt[id]->status == MK_THREAD_DEAD) {
185 _mk_thread_release(sch->dt[id]);
186 sch->dt[id] = NULL;
187 }
188
189 dt = mk_mem_alloc(sizeof(*dt));
190 if (!dt) {
191 return -1;
192 }
193
194 dt->func = func;
195 dt->data = data;
196 dt->sch = sch;
197 dt->status = MK_THREAD_READY;
198 dt->parent_id = -1;
199 #ifdef USE_VALGRIND
200 dt->valgrind_stack_id = VALGRIND_STACK_REGISTER(dt->stack, dt->stack + MK_THREAD_STACK_SIZE);
201 #endif
202 mk_list_init(&dt->chan_list);
203 sch->dt[id] = dt;
204 sch->n_dthread++;
205 return id;
206 }
207
_mk_thread_release(struct mk_thread * dt)208 static void _mk_thread_release(struct mk_thread *dt)
209 {
210 assert(dt);
211 #ifdef USE_VALGRIND
212 VALGRIND_STACK_DEREGISTER(dt->valgrind_stack_id);
213 #endif
214 mk_mem_free(dt);
215 }
216
217 /*
218 * @METHOD_NAME: status
219 * @METHOD_DESC: get the status of a given dthread.
220 * @METHOD_PROTO: int status(int id)
221 * @METHOD_PARAM: id the dthread id of the target dthread.
222 * @METHOD_RETURN: it returns one of the following status: MK_THREAD_DEAD, MK_THREAD_READY,
223 * MK_THREAD_RUNNING, MK_THREAD_SUSPEND.
224 */
mk_thread_status(int id)225 int mk_thread_status(int id)
226 {
227 struct mk_thread_scheduler *sch;
228
229 sch = pthread_getspecific(mk_thread_scheduler);
230 assert(sch);
231 assert(id >= 0 && id < sch->cap);
232 if (!sch->dt[id]) return MK_THREAD_DEAD;
233 return sch->dt[id]->status;
234 }
235
236 /*
237 * @METHOD_NAME: yield
238 * @METHOD_DESC: require the currently running dthread explicitly to give up control
239 * back to the dthread scheduler.
240 * @METHOD_PROTO: void yield()
241 * @METHOD_RETURN: this method do not return any value.
242 */
mk_thread_yield()243 void mk_thread_yield()
244 {
245 int id;
246 struct mk_thread *dt;
247 struct mk_thread_scheduler *sch;
248
249 sch = pthread_getspecific(mk_thread_scheduler);
250 assert(sch);
251
252 id = sch->running_id;
253 assert(id >= 0);
254
255 dt = sch->dt[id];
256 dt->status = MK_THREAD_SUSPEND;
257 sch->running_id = -1;
258 swapcontext(&dt->context, &sch->main);
259 }
260
261 /*
262 * @METHOD_NAME: resume
263 * @METHOD_DESC: resume a given dthread and suspend the currently running dthread.
264 * @METHOD_PROTO: void resume(int id)
265 * @METHOD_PARAM: id the dthread id of the target dthread.
266 * @METHOD_RETURN: this method do not return any value.
267 */
mk_thread_resume(int id)268 void mk_thread_resume(int id)
269 {
270 struct mk_thread *dt;
271 struct mk_thread *running_dt;
272 struct mk_thread_scheduler *sch;
273
274 sch = pthread_getspecific(mk_thread_scheduler);
275 assert(sch);
276 assert(id >= 0 && id < sch->cap);
277
278 running_dt = NULL;
279 if (sch->running_id != -1) {
280 running_dt = sch->dt[sch->running_id];
281 }
282
283 dt = sch->dt[id];
284 if (!dt) return;
285 switch (dt->status) {
286 case MK_THREAD_READY:
287 getcontext(&dt->context);
288 dt->context.uc_stack.ss_sp = dt->stack;
289 dt->context.uc_stack.ss_size = MK_THREAD_STACK_SIZE;
290 if (running_dt) {
291 dt->context.uc_link = &running_dt->context;
292 dt->parent_id = sch->running_id;
293 running_dt->status = MK_THREAD_SUSPEND;
294 } else {
295 dt->context.uc_link = &sch->main;
296 }
297 sch->running_id = id;
298 dt->status = MK_THREAD_RUNNING;
299 makecontext(&dt->context, (void (*)(void))_mk_thread_entry_point, 1, sch);
300 if (running_dt) {
301 swapcontext(&running_dt->context, &dt->context);
302 } else {
303 swapcontext(&sch->main, &dt->context);
304 }
305 break;
306 case MK_THREAD_SUSPEND:
307 sch->running_id = id;
308 dt->status = MK_THREAD_RUNNING;
309 if (running_dt) {
310 running_dt->status = MK_THREAD_SUSPEND;
311 swapcontext(&running_dt->context, &dt->context);
312 } else {
313 swapcontext(&sch->main, &dt->context);
314 }
315 break;
316 default:
317 assert(0);
318 }
319 }
320
321 /*
322 * @METHOD_NAME: running
323 * @METHOD_DESC: get the id of the currently running dthread.
324 * @METHOD_PROTO: int running()
325 * @METHOD_RETURN: the dthread id associated with the currently running dthread.
326 */
mk_thread_running()327 int mk_thread_running()
328 {
329 struct mk_thread_scheduler *sch;
330
331 sch = pthread_getspecific(mk_thread_scheduler);
332 assert(sch);
333 return sch->running_id;
334 }
335
mk_thread_add_channel(int id,struct mk_thread_channel * chan)336 void mk_thread_add_channel(int id, struct mk_thread_channel *chan)
337 {
338 struct mk_thread_scheduler *sch;
339 struct mk_thread *dt;
340
341 assert(chan);
342 sch = pthread_getspecific(mk_thread_scheduler);
343 assert(sch);
344 assert(id >= 0 && id < sch->cap);
345 dt = sch->dt[id];
346 mk_list_add(&chan->_head, &dt->chan_list);
347 }
348