1 /* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
2 
3 /*  Monkey HTTP Server (Duda I/O)
4  *  -----------------------------
5  *  Copyright 2017 Eduardo Silva <eduardo@monkey.io>
6  *  Copyright 2014, Zeying Xie <swpdtz at gmail dot com>
7  *
8  *  Licensed under the Apache License, Version 2.0 (the "License");
9  *  you may not use this file except in compliance with the License.
10  *  You may obtain a copy of the License at
11  *
12  *      http://www.apache.org/licenses/LICENSE-2.0
13  *
14  *  Unless required by applicable law or agreed to in writing, software
15  *  distributed under the License is distributed on an "AS IS" BASIS,
16  *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
17  *  See the License for the specific language governing permissions and
18  *  limitations under the License.
19  */
20 
21 #include <assert.h>
22 #include <string.h>
23 
24 #if defined (__APPLE__)
25 #include <sys/ucontext.h>
26 #else
27 #include <ucontext.h>
28 #endif
29 
30 #include <limits.h>
31 
32 #include <mk_core/mk_pthread.h>
33 #include <mk_core/mk_memory.h>
34 #include <mk_core/mk_thread.h>
35 
36 /*
37  * @OBJ_NAME: dthread
38  * @OBJ_MENU: Dthread
39  * @OBJ_DESC: The dthread object provides a set of methods to handle user space cooperative thread, namely dthread(duda thread).
40  * A dthread can be suspended when it encounters something that will block(in other
41  * words, something will be available in the future), while another dthread that
42  * is ready to run is awakened. Back and forth, all dthreads within the same pthread
43  * work collaboratively. This means dthread is non-preemptive and requires the user
44  * to explicitly give up control when necessary.
45  * Dthreads communicate with each other by using channel, a channel is like a pipe,
46  * one dthread feeds data to the channel while another cosumes from it.
47  *
48  */
49 
50 #ifdef USE_VALGRIND
51 #include <valgrind/valgrind.h>
52 #endif
53 
54 #define MK_THREAD_STACK_SIZE (3 * (PTHREAD_STACK_MIN) / 2)
55 #define DEFAULT_MK_THREAD_NUM    16
56 
57 struct mk_thread {
58     mk_thread_func func;
59     void *data;
60     ucontext_t context;
61     struct mk_thread_scheduler *sch;
62     int status;
63     int parent_id;
64 #ifdef USE_VALGRIND
65     unsigned int valgrind_stack_id;
66 #endif
67     struct mk_list chan_list;
68     char stack[MK_THREAD_STACK_SIZE];
69 } mk_thread_t;
70 
71 struct mk_thread_scheduler {
72     ucontext_t main;
73     int n_dthread;
74     int cap;
75     int running_id;
76     struct mk_thread **dt;
77 };
78 
79 static void _mk_thread_release(struct mk_thread *dt);
80 
_mk_thread_entry_point(struct mk_thread_scheduler * sch)81 static void _mk_thread_entry_point(struct mk_thread_scheduler *sch)
82 {
83     int id;
84     struct mk_thread *dt;
85     struct mk_list *head;
86     struct mk_thread_channel *chan;
87 
88     assert(sch);
89     id = sch->running_id;
90     dt = sch->dt[id];
91     dt->func(dt->data);
92     dt->status = MK_THREAD_DEAD;
93 
94     mk_list_foreach(head, &dt->chan_list) {
95         chan = mk_list_entry(head, struct mk_thread_channel, _head);
96         chan->receiver = -1;
97     }
98     sch->n_dthread--;
99     sch->running_id = dt->parent_id;
100 }
101 
mk_thread_open()102 struct mk_thread_scheduler *mk_thread_open()
103 {
104     struct mk_thread_scheduler *sch;
105 
106     sch = mk_mem_alloc(sizeof(*sch));
107     if (!sch) {
108         return NULL;
109     }
110 
111     sch->n_dthread = 0;
112     sch->cap = DEFAULT_MK_THREAD_NUM;
113     sch->running_id = -1;
114     sch->dt = mk_mem_alloc_z(sizeof(struct mk_thread *) * sch->cap);
115     if (!sch->dt) {
116         mk_mem_free(sch);
117         return NULL;
118     }
119 
120     return sch;
121 }
122 
mk_thread_close(struct mk_thread_scheduler * sch)123 void mk_thread_close(struct mk_thread_scheduler *sch)
124 {
125     struct mk_thread *dt;
126 
127     int i;
128     for (i = 0; i < sch->cap; ++i) {
129         dt = sch->dt[i];
130         if (dt) {
131             _mk_thread_release(dt);
132         }
133     }
134     mk_mem_free(sch->dt);
135     sch->dt = NULL;
136     mk_mem_free(sch);
137 }
138 
139 /*
140  * @METHOD_NAME: create
141  * @METHOD_DESC: create a new dthread.
142  * @METHOD_PROTO: int create(mk_thread_func func, void *data)
143  * @METHOD_PARAM: func the function to be executed when the newly created dthread
144  * is started.
145  * @METHOD_PARAM: data user specific data that will be passed to func.
146  * @METHOD_RETURN: the dthread id associated with the new dthread.
147  */
mk_thread_create(mk_thread_func func,void * data)148 int mk_thread_create(mk_thread_func func, void *data)
149 {
150     int i;
151     int id;
152     void *p;
153     struct mk_thread_scheduler *sch;
154     struct mk_thread *dt;
155 
156     sch = pthread_getspecific(mk_thread_scheduler);
157     if (!sch) {
158         sch = mk_thread_open();
159         assert(sch);
160         pthread_setspecific(mk_thread_scheduler, (void *) sch);
161     }
162 
163     if (sch->n_dthread >= sch->cap) {
164         id = sch->cap;
165 
166         p = mk_mem_realloc(sch->dt, sch->cap * 2 * sizeof(struct mk_thread *));
167         if (!p) {
168             return -1;
169         }
170         sch->dt = p;
171         memset(sch->dt + sch->cap, 0, sizeof(struct mk_thread *) * sch->cap);
172         sch->cap *= 2;
173     }
174     else {
175         for (i = 0; i < sch->cap; ++i) {
176             id = (i + sch->cap) % sch->cap;
177             if (sch->dt[id] == NULL || sch->dt[id]->status == MK_THREAD_DEAD) {
178                 break;
179             }
180         }
181     }
182 
183     /* may use dthread pooling instead of release and realloc */
184     if (sch->dt[id] && sch->dt[id]->status == MK_THREAD_DEAD) {
185         _mk_thread_release(sch->dt[id]);
186         sch->dt[id] = NULL;
187     }
188 
189     dt = mk_mem_alloc(sizeof(*dt));
190     if (!dt) {
191         return -1;
192     }
193 
194     dt->func = func;
195     dt->data = data;
196     dt->sch = sch;
197     dt->status = MK_THREAD_READY;
198     dt->parent_id = -1;
199 #ifdef USE_VALGRIND
200     dt->valgrind_stack_id = VALGRIND_STACK_REGISTER(dt->stack, dt->stack + MK_THREAD_STACK_SIZE);
201 #endif
202     mk_list_init(&dt->chan_list);
203     sch->dt[id] = dt;
204     sch->n_dthread++;
205     return id;
206 }
207 
_mk_thread_release(struct mk_thread * dt)208 static void _mk_thread_release(struct mk_thread *dt)
209 {
210     assert(dt);
211 #ifdef USE_VALGRIND
212     VALGRIND_STACK_DEREGISTER(dt->valgrind_stack_id);
213 #endif
214     mk_mem_free(dt);
215 }
216 
217 /*
218  * @METHOD_NAME: status
219  * @METHOD_DESC: get the status of a given dthread.
220  * @METHOD_PROTO: int status(int id)
221  * @METHOD_PARAM: id the dthread id of the target dthread.
222  * @METHOD_RETURN: it returns one of the following status: MK_THREAD_DEAD, MK_THREAD_READY,
223  * MK_THREAD_RUNNING, MK_THREAD_SUSPEND.
224  */
mk_thread_status(int id)225 int mk_thread_status(int id)
226 {
227     struct mk_thread_scheduler *sch;
228 
229     sch = pthread_getspecific(mk_thread_scheduler);
230     assert(sch);
231     assert(id >= 0 && id < sch->cap);
232     if (!sch->dt[id]) return MK_THREAD_DEAD;
233     return sch->dt[id]->status;
234 }
235 
236 /*
237  * @METHOD_NAME: yield
238  * @METHOD_DESC: require the currently running dthread explicitly to give up control
239  * back to the dthread scheduler.
240  * @METHOD_PROTO: void yield()
241  * @METHOD_RETURN: this method do not return any value.
242  */
mk_thread_yield()243 void mk_thread_yield()
244 {
245     int id;
246     struct mk_thread *dt;
247     struct mk_thread_scheduler *sch;
248 
249     sch = pthread_getspecific(mk_thread_scheduler);
250     assert(sch);
251 
252     id = sch->running_id;
253     assert(id >= 0);
254 
255     dt = sch->dt[id];
256     dt->status = MK_THREAD_SUSPEND;
257     sch->running_id = -1;
258     swapcontext(&dt->context, &sch->main);
259 }
260 
261 /*
262  * @METHOD_NAME: resume
263  * @METHOD_DESC: resume a given dthread and suspend the currently running dthread.
264  * @METHOD_PROTO: void resume(int id)
265  * @METHOD_PARAM: id the dthread id of the target dthread.
266  * @METHOD_RETURN: this method do not return any value.
267  */
mk_thread_resume(int id)268 void mk_thread_resume(int id)
269 {
270     struct mk_thread *dt;
271     struct mk_thread *running_dt;
272     struct mk_thread_scheduler *sch;
273 
274     sch = pthread_getspecific(mk_thread_scheduler);
275     assert(sch);
276     assert(id >= 0 && id < sch->cap);
277 
278     running_dt = NULL;
279     if (sch->running_id != -1) {
280         running_dt = sch->dt[sch->running_id];
281     }
282 
283     dt = sch->dt[id];
284     if (!dt) return;
285     switch (dt->status) {
286     case MK_THREAD_READY:
287         getcontext(&dt->context);
288         dt->context.uc_stack.ss_sp = dt->stack;
289         dt->context.uc_stack.ss_size = MK_THREAD_STACK_SIZE;
290         if (running_dt) {
291             dt->context.uc_link = &running_dt->context;
292             dt->parent_id = sch->running_id;
293             running_dt->status = MK_THREAD_SUSPEND;
294         } else {
295             dt->context.uc_link = &sch->main;
296         }
297         sch->running_id = id;
298         dt->status = MK_THREAD_RUNNING;
299         makecontext(&dt->context, (void (*)(void))_mk_thread_entry_point, 1, sch);
300         if (running_dt) {
301             swapcontext(&running_dt->context, &dt->context);
302         } else {
303             swapcontext(&sch->main, &dt->context);
304         }
305         break;
306     case MK_THREAD_SUSPEND:
307         sch->running_id = id;
308         dt->status = MK_THREAD_RUNNING;
309         if (running_dt) {
310             running_dt->status = MK_THREAD_SUSPEND;
311             swapcontext(&running_dt->context, &dt->context);
312         } else {
313             swapcontext(&sch->main, &dt->context);
314         }
315         break;
316     default:
317         assert(0);
318     }
319 }
320 
321 /*
322  * @METHOD_NAME: running
323  * @METHOD_DESC: get the id of the currently running dthread.
324  * @METHOD_PROTO: int running()
325  * @METHOD_RETURN: the dthread id associated with the currently running dthread.
326  */
mk_thread_running()327 int mk_thread_running()
328 {
329     struct mk_thread_scheduler *sch;
330 
331     sch = pthread_getspecific(mk_thread_scheduler);
332     assert(sch);
333     return sch->running_id;
334 }
335 
mk_thread_add_channel(int id,struct mk_thread_channel * chan)336 void mk_thread_add_channel(int id, struct mk_thread_channel *chan)
337 {
338     struct mk_thread_scheduler *sch;
339     struct mk_thread *dt;
340 
341     assert(chan);
342     sch = pthread_getspecific(mk_thread_scheduler);
343     assert(sch);
344     assert(id >= 0 && id < sch->cap);
345     dt = sch->dt[id];
346     mk_list_add(&chan->_head, &dt->chan_list);
347 }
348