1 /*
2 * Copyright (c) 2014 Joris Vink <joris@coders.se>
3 *
4 * Permission to use, copy, modify, and distribute this software for any
5 * purpose with or without fee is hereby granted, provided that the above
6 * copyright notice and this permission notice appear in all copies.
7 *
8 * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9 * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10 * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11 * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12 * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13 * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14 * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15 */
16
17 #include <sys/param.h>
18 #include <sys/queue.h>
19 #include <sys/socket.h>
20
21 #include <pthread.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24
25 #include "kore.h"
26 #include "http.h"
27 #include "tasks.h"
28
29 #if defined(__linux__)
30 #include "seccomp.h"
31
32 static struct sock_filter filter_task[] = {
33 KORE_SYSCALL_ALLOW(clone),
34 KORE_SYSCALL_ALLOW(socketpair),
35 KORE_SYSCALL_ALLOW(set_robust_list),
36 };
37 #endif
38
39 static u_int8_t threads;
40 static TAILQ_HEAD(, kore_task_thread) task_threads;
41
42 u_int16_t kore_task_threads = KORE_TASK_THREADS;
43
44 static void *task_thread(void *);
45 static void task_channel_read(int, void *, u_int32_t);
46 static void task_channel_write(int, void *, u_int32_t);
47 static void task_thread_spawn(struct kore_task_thread **);
48
49 #define THREAD_FD_ASSIGN(t, f, i, o) \
50 do { \
51 if (pthread_self() == t) { \
52 f = i; \
53 } else { \
54 f = o; \
55 } \
56 } while (0);
57
58 void
kore_task_init(void)59 kore_task_init(void)
60 {
61 threads = 0;
62 TAILQ_INIT(&task_threads);
63
64 #if defined(__linux__)
65 kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
66 #endif
67 }
68
69 void
kore_task_create(struct kore_task * t,int (* entry)(struct kore_task *))70 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
71 {
72 t->cb = NULL;
73 #if !defined(KORE_NO_HTTP)
74 t->req = NULL;
75 #endif
76 t->evt.type = KORE_TYPE_TASK;
77 t->evt.handle = kore_task_handle;
78
79 t->entry = entry;
80 t->state = KORE_TASK_STATE_CREATED;
81 pthread_rwlock_init(&(t->lock), NULL);
82
83 if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
84 fatal("kore_task_create: socketpair() %s", errno_s);
85 }
86
87 void
kore_task_run(struct kore_task * t)88 kore_task_run(struct kore_task *t)
89 {
90 struct kore_task_thread *tt;
91
92 kore_platform_schedule_read(t->fds[0], t);
93 if (threads < kore_task_threads) {
94 /* task_thread_spawn() will lock tt->lock for us. */
95 task_thread_spawn(&tt);
96 } else {
97 /* Cycle task around. */
98 if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
99 fatal("no available tasks threads?");
100 pthread_mutex_lock(&(tt->lock));
101 TAILQ_REMOVE(&task_threads, tt, list);
102 TAILQ_INSERT_TAIL(&task_threads, tt, list);
103 }
104
105 t->thread = tt;
106 TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
107
108 pthread_mutex_unlock(&(tt->lock));
109 pthread_cond_signal(&(tt->cond));
110 }
111
112 #if !defined(KORE_NO_HTTP)
113 void
kore_task_bind_request(struct kore_task * t,struct http_request * req)114 kore_task_bind_request(struct kore_task *t, struct http_request *req)
115 {
116 kore_debug("kore_task_bind_request: %p bound to %p", req, t);
117
118 if (t->cb != NULL)
119 fatal("cannot bind cbs and requests at the same time");
120
121 t->req = req;
122 LIST_INSERT_HEAD(&(req->tasks), t, rlist);
123
124 http_request_sleep(req);
125 }
126 #endif
127
128 void
kore_task_bind_callback(struct kore_task * t,void (* cb)(struct kore_task *))129 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
130 {
131 #if !defined(KORE_NO_HTTP)
132 if (t->req != NULL)
133 fatal("cannot bind requests and cbs at the same time");
134 #endif
135 t->cb = cb;
136 }
137
138 void
kore_task_destroy(struct kore_task * t)139 kore_task_destroy(struct kore_task *t)
140 {
141 kore_debug("kore_task_destroy: %p", t);
142
143 #if !defined(KORE_NO_HTTP)
144 if (t->req != NULL) {
145 t->req = NULL;
146 LIST_REMOVE(t, rlist);
147 }
148 #endif
149
150 pthread_rwlock_wrlock(&(t->lock));
151
152 if (t->fds[0] != -1) {
153 (void)close(t->fds[0]);
154 t->fds[0] = -1;
155 }
156
157 if (t->fds[1] != -1) {
158 (void)close(t->fds[1]);
159 t->fds[1] = -1;
160 }
161
162 pthread_rwlock_unlock(&(t->lock));
163 pthread_rwlock_destroy(&(t->lock));
164 }
165
166 int
kore_task_finished(struct kore_task * t)167 kore_task_finished(struct kore_task *t)
168 {
169 return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
170 }
171
172 void
kore_task_finish(struct kore_task * t)173 kore_task_finish(struct kore_task *t)
174 {
175 kore_debug("kore_task_finished: %p (%d)", t, t->result);
176 pthread_rwlock_wrlock(&(t->lock));
177
178 if (t->fds[1] != -1) {
179 (void)close(t->fds[1]);
180 t->fds[1] = -1;
181 }
182
183 pthread_rwlock_unlock(&(t->lock));
184 }
185
186 void
kore_task_channel_write(struct kore_task * t,void * data,u_int32_t len)187 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
188 {
189 int fd;
190
191 kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
192
193 THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
194 task_channel_write(fd, &len, sizeof(len));
195 task_channel_write(fd, data, len);
196 }
197
198 u_int32_t
kore_task_channel_read(struct kore_task * t,void * out,u_int32_t len)199 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
200 {
201 int fd;
202 u_int32_t dlen, bytes;
203
204 kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len);
205
206 THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
207 task_channel_read(fd, &dlen, sizeof(dlen));
208
209 if (dlen > len)
210 bytes = len;
211 else
212 bytes = dlen;
213
214 task_channel_read(fd, out, bytes);
215
216 return (dlen);
217 }
218
219 void
kore_task_handle(void * arg,int finished)220 kore_task_handle(void *arg, int finished)
221 {
222 struct kore_task *t = arg;
223
224 kore_debug("kore_task_handle: %p, %d", t, finished);
225
226 #if !defined(KORE_NO_HTTP)
227 if (t->req != NULL)
228 http_request_wakeup(t->req);
229 #endif
230
231 if (finished) {
232 kore_platform_disable_read(t->fds[0]);
233 kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
234 #if !defined(KORE_NO_HTTP)
235 if (t->req != NULL) {
236 if (t->req->flags & HTTP_REQUEST_DELETE)
237 kore_task_destroy(t);
238 }
239 #endif
240 }
241
242 if (t->cb != NULL)
243 t->cb(t);
244 }
245
246 int
kore_task_state(struct kore_task * t)247 kore_task_state(struct kore_task *t)
248 {
249 int s;
250
251 pthread_rwlock_rdlock(&(t->lock));
252 s = t->state;
253 pthread_rwlock_unlock(&(t->lock));
254
255 return (s);
256 }
257
258 void
kore_task_set_state(struct kore_task * t,int state)259 kore_task_set_state(struct kore_task *t, int state)
260 {
261 pthread_rwlock_wrlock(&(t->lock));
262 t->state = state;
263 pthread_rwlock_unlock(&(t->lock));
264 }
265
266 int
kore_task_result(struct kore_task * t)267 kore_task_result(struct kore_task *t)
268 {
269 int r;
270
271 pthread_rwlock_rdlock(&(t->lock));
272 r = t->result;
273 pthread_rwlock_unlock(&(t->lock));
274
275 return (r);
276 }
277
278 void
kore_task_set_result(struct kore_task * t,int result)279 kore_task_set_result(struct kore_task *t, int result)
280 {
281 pthread_rwlock_wrlock(&(t->lock));
282 t->result = result;
283 pthread_rwlock_unlock(&(t->lock));
284 }
285
286 static void
task_channel_write(int fd,void * data,u_int32_t len)287 task_channel_write(int fd, void *data, u_int32_t len)
288 {
289 ssize_t r;
290 u_int8_t *d;
291 u_int32_t offset;
292
293 d = data;
294 offset = 0;
295 while (offset != len) {
296 r = send(fd, d + offset, len - offset, 0);
297 if (r == -1 && errno == EINTR)
298 continue;
299 if (r == -1)
300 fatal("task_channel_write: %s", errno_s);
301 offset += r;
302 }
303 }
304
305 static void
task_channel_read(int fd,void * out,u_int32_t len)306 task_channel_read(int fd, void *out, u_int32_t len)
307 {
308 ssize_t r;
309 u_int8_t *d;
310 u_int32_t offset;
311
312 d = out;
313 offset = 0;
314 while (offset != len) {
315 r = read(fd, d + offset, len - offset);
316 if (r == -1 && errno == EINTR)
317 continue;
318 if (r == -1)
319 fatal("task_channel_read: %s", errno_s);
320 if (r == 0)
321 fatal("task_channel_read: unexpected eof");
322
323 offset += r;
324 }
325 }
326
327 static void
task_thread_spawn(struct kore_task_thread ** out)328 task_thread_spawn(struct kore_task_thread **out)
329 {
330 struct kore_task_thread *tt;
331
332 tt = kore_malloc(sizeof(*tt));
333 tt->idx = threads++;
334
335 TAILQ_INIT(&(tt->tasks));
336 pthread_cond_init(&(tt->cond), NULL);
337 pthread_mutex_init(&(tt->lock), NULL);
338 pthread_mutex_lock(&(tt->lock));
339 TAILQ_INSERT_TAIL(&task_threads, tt, list);
340
341 if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
342 fatal("pthread_create: %s", errno_s);
343
344 *out = tt;
345 }
346
347 static void *
task_thread(void * arg)348 task_thread(void *arg)
349 {
350 struct kore_task *t;
351 struct kore_task_thread *tt = arg;
352
353 kore_debug("task_thread: #%d starting", tt->idx);
354
355 pthread_mutex_lock(&(tt->lock));
356
357 for (;;) {
358 if (TAILQ_EMPTY(&(tt->tasks)))
359 pthread_cond_wait(&(tt->cond), &(tt->lock));
360
361 kore_debug("task_thread#%d: woke up", tt->idx);
362
363 t = TAILQ_FIRST(&(tt->tasks));
364 TAILQ_REMOVE(&(tt->tasks), t, list);
365 pthread_mutex_unlock(&(tt->lock));
366
367 kore_debug("task_thread#%d: executing %p", tt->idx, t);
368
369 kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
370 kore_task_set_result(t, t->entry(t));
371 kore_task_finish(t);
372
373 pthread_mutex_lock(&(tt->lock));
374 }
375
376 pthread_exit(NULL);
377
378 /* NOTREACHED */
379 return (NULL);
380 }
381