1 /*
2  * Copyright (c) 2014 Joris Vink <joris@coders.se>
3  *
4  * Permission to use, copy, modify, and distribute this software for any
5  * purpose with or without fee is hereby granted, provided that the above
6  * copyright notice and this permission notice appear in all copies.
7  *
8  * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
9  * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10  * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
11  * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12  * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13  * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
14  * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15  */
16 
17 #include <sys/param.h>
18 #include <sys/queue.h>
19 #include <sys/socket.h>
20 
21 #include <pthread.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 
25 #include "kore.h"
26 #include "http.h"
27 #include "tasks.h"
28 
29 #if defined(__linux__)
30 #include "seccomp.h"
31 
32 static struct sock_filter filter_task[] = {
33 	KORE_SYSCALL_ALLOW(clone),
34 	KORE_SYSCALL_ALLOW(socketpair),
35 	KORE_SYSCALL_ALLOW(set_robust_list),
36 };
37 #endif
38 
39 static u_int8_t				threads;
40 static TAILQ_HEAD(, kore_task_thread)	task_threads;
41 
42 u_int16_t	kore_task_threads = KORE_TASK_THREADS;
43 
44 static void	*task_thread(void *);
45 static void	task_channel_read(int, void *, u_int32_t);
46 static void	task_channel_write(int, void *, u_int32_t);
47 static void	task_thread_spawn(struct kore_task_thread **);
48 
49 #define THREAD_FD_ASSIGN(t, f, i, o)				\
50 	do {							\
51 		if (pthread_self() == t) {			\
52 			f = i;					\
53 		} else {					\
54 			f = o;					\
55 		}						\
56 	} while (0);
57 
58 void
kore_task_init(void)59 kore_task_init(void)
60 {
61 	threads = 0;
62 	TAILQ_INIT(&task_threads);
63 
64 #if defined(__linux__)
65 	kore_seccomp_filter("task", filter_task, KORE_FILTER_LEN(filter_task));
66 #endif
67 }
68 
69 void
kore_task_create(struct kore_task * t,int (* entry)(struct kore_task *))70 kore_task_create(struct kore_task *t, int (*entry)(struct kore_task *))
71 {
72 	t->cb = NULL;
73 #if !defined(KORE_NO_HTTP)
74 	t->req = NULL;
75 #endif
76 	t->evt.type = KORE_TYPE_TASK;
77 	t->evt.handle = kore_task_handle;
78 
79 	t->entry = entry;
80 	t->state = KORE_TASK_STATE_CREATED;
81 	pthread_rwlock_init(&(t->lock), NULL);
82 
83 	if (socketpair(AF_UNIX, SOCK_STREAM, 0, t->fds) == -1)
84 		fatal("kore_task_create: socketpair() %s", errno_s);
85 }
86 
87 void
kore_task_run(struct kore_task * t)88 kore_task_run(struct kore_task *t)
89 {
90 	struct kore_task_thread		*tt;
91 
92 	kore_platform_schedule_read(t->fds[0], t);
93 	if (threads < kore_task_threads) {
94 		/* task_thread_spawn() will lock tt->lock for us. */
95 		task_thread_spawn(&tt);
96 	} else {
97 		/* Cycle task around. */
98 		if ((tt = TAILQ_FIRST(&task_threads)) == NULL)
99 			fatal("no available tasks threads?");
100 		pthread_mutex_lock(&(tt->lock));
101 		TAILQ_REMOVE(&task_threads, tt, list);
102 		TAILQ_INSERT_TAIL(&task_threads, tt, list);
103 	}
104 
105 	t->thread = tt;
106 	TAILQ_INSERT_TAIL(&(tt->tasks), t, list);
107 
108 	pthread_mutex_unlock(&(tt->lock));
109 	pthread_cond_signal(&(tt->cond));
110 }
111 
112 #if !defined(KORE_NO_HTTP)
113 void
kore_task_bind_request(struct kore_task * t,struct http_request * req)114 kore_task_bind_request(struct kore_task *t, struct http_request *req)
115 {
116 	kore_debug("kore_task_bind_request: %p bound to %p", req, t);
117 
118 	if (t->cb != NULL)
119 		fatal("cannot bind cbs and requests at the same time");
120 
121 	t->req = req;
122 	LIST_INSERT_HEAD(&(req->tasks), t, rlist);
123 
124 	http_request_sleep(req);
125 }
126 #endif
127 
128 void
kore_task_bind_callback(struct kore_task * t,void (* cb)(struct kore_task *))129 kore_task_bind_callback(struct kore_task *t, void (*cb)(struct kore_task *))
130 {
131 #if !defined(KORE_NO_HTTP)
132 	if (t->req != NULL)
133 		fatal("cannot bind requests and cbs at the same time");
134 #endif
135 	t->cb = cb;
136 }
137 
138 void
kore_task_destroy(struct kore_task * t)139 kore_task_destroy(struct kore_task *t)
140 {
141 	kore_debug("kore_task_destroy: %p", t);
142 
143 #if !defined(KORE_NO_HTTP)
144 	if (t->req != NULL) {
145 		t->req = NULL;
146 		LIST_REMOVE(t, rlist);
147 	}
148 #endif
149 
150 	pthread_rwlock_wrlock(&(t->lock));
151 
152 	if (t->fds[0] != -1) {
153 		(void)close(t->fds[0]);
154 		t->fds[0] = -1;
155 	}
156 
157 	if (t->fds[1] != -1) {
158 		(void)close(t->fds[1]);
159 		t->fds[1] = -1;
160 	}
161 
162 	pthread_rwlock_unlock(&(t->lock));
163 	pthread_rwlock_destroy(&(t->lock));
164 }
165 
166 int
kore_task_finished(struct kore_task * t)167 kore_task_finished(struct kore_task *t)
168 {
169 	return ((kore_task_state(t) == KORE_TASK_STATE_FINISHED));
170 }
171 
172 void
kore_task_finish(struct kore_task * t)173 kore_task_finish(struct kore_task *t)
174 {
175 	kore_debug("kore_task_finished: %p (%d)", t, t->result);
176 	pthread_rwlock_wrlock(&(t->lock));
177 
178 	if (t->fds[1] != -1) {
179 		(void)close(t->fds[1]);
180 		t->fds[1] = -1;
181 	}
182 
183 	pthread_rwlock_unlock(&(t->lock));
184 }
185 
186 void
kore_task_channel_write(struct kore_task * t,void * data,u_int32_t len)187 kore_task_channel_write(struct kore_task *t, void *data, u_int32_t len)
188 {
189 	int		fd;
190 
191 	kore_debug("kore_task_channel_write: %p <- %p (%ld)", t, data, len);
192 
193 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
194 	task_channel_write(fd, &len, sizeof(len));
195 	task_channel_write(fd, data, len);
196 }
197 
198 u_int32_t
kore_task_channel_read(struct kore_task * t,void * out,u_int32_t len)199 kore_task_channel_read(struct kore_task *t, void *out, u_int32_t len)
200 {
201 	int		fd;
202 	u_int32_t	dlen, bytes;
203 
204 	kore_debug("kore_task_channel_read: %p -> %p (%ld)", t, out, len);
205 
206 	THREAD_FD_ASSIGN(t->thread->tid, fd, t->fds[1], t->fds[0]);
207 	task_channel_read(fd, &dlen, sizeof(dlen));
208 
209 	if (dlen > len)
210 		bytes = len;
211 	else
212 		bytes = dlen;
213 
214 	task_channel_read(fd, out, bytes);
215 
216 	return (dlen);
217 }
218 
219 void
kore_task_handle(void * arg,int finished)220 kore_task_handle(void *arg, int finished)
221 {
222 	struct kore_task	*t = arg;
223 
224 	kore_debug("kore_task_handle: %p, %d", t, finished);
225 
226 #if !defined(KORE_NO_HTTP)
227 	if (t->req != NULL)
228 		http_request_wakeup(t->req);
229 #endif
230 
231 	if (finished) {
232 		kore_platform_disable_read(t->fds[0]);
233 		kore_task_set_state(t, KORE_TASK_STATE_FINISHED);
234 #if !defined(KORE_NO_HTTP)
235 		if (t->req != NULL) {
236 			if (t->req->flags & HTTP_REQUEST_DELETE)
237 				kore_task_destroy(t);
238 		}
239 #endif
240 	}
241 
242 	if (t->cb != NULL)
243 		t->cb(t);
244 }
245 
246 int
kore_task_state(struct kore_task * t)247 kore_task_state(struct kore_task *t)
248 {
249 	int	s;
250 
251 	pthread_rwlock_rdlock(&(t->lock));
252 	s = t->state;
253 	pthread_rwlock_unlock(&(t->lock));
254 
255 	return (s);
256 }
257 
258 void
kore_task_set_state(struct kore_task * t,int state)259 kore_task_set_state(struct kore_task *t, int state)
260 {
261 	pthread_rwlock_wrlock(&(t->lock));
262 	t->state = state;
263 	pthread_rwlock_unlock(&(t->lock));
264 }
265 
266 int
kore_task_result(struct kore_task * t)267 kore_task_result(struct kore_task *t)
268 {
269 	int	r;
270 
271 	pthread_rwlock_rdlock(&(t->lock));
272 	r = t->result;
273 	pthread_rwlock_unlock(&(t->lock));
274 
275 	return (r);
276 }
277 
278 void
kore_task_set_result(struct kore_task * t,int result)279 kore_task_set_result(struct kore_task *t, int result)
280 {
281 	pthread_rwlock_wrlock(&(t->lock));
282 	t->result = result;
283 	pthread_rwlock_unlock(&(t->lock));
284 }
285 
286 static void
task_channel_write(int fd,void * data,u_int32_t len)287 task_channel_write(int fd, void *data, u_int32_t len)
288 {
289 	ssize_t		r;
290 	u_int8_t	*d;
291 	u_int32_t	offset;
292 
293 	d = data;
294 	offset = 0;
295 	while (offset != len) {
296 		r = send(fd, d + offset, len - offset, 0);
297 		if (r == -1 && errno == EINTR)
298 			continue;
299 		if (r == -1)
300 			fatal("task_channel_write: %s", errno_s);
301 		offset += r;
302 	}
303 }
304 
305 static void
task_channel_read(int fd,void * out,u_int32_t len)306 task_channel_read(int fd, void *out, u_int32_t len)
307 {
308 	ssize_t		r;
309 	u_int8_t	*d;
310 	u_int32_t	offset;
311 
312 	d = out;
313 	offset = 0;
314 	while (offset != len) {
315 		r = read(fd, d + offset, len - offset);
316 		if (r == -1 && errno == EINTR)
317 			continue;
318 		if (r == -1)
319 			fatal("task_channel_read: %s", errno_s);
320 		if (r == 0)
321 			fatal("task_channel_read: unexpected eof");
322 
323 		offset += r;
324 	}
325 }
326 
327 static void
task_thread_spawn(struct kore_task_thread ** out)328 task_thread_spawn(struct kore_task_thread **out)
329 {
330 	struct kore_task_thread		*tt;
331 
332 	tt = kore_malloc(sizeof(*tt));
333 	tt->idx = threads++;
334 
335 	TAILQ_INIT(&(tt->tasks));
336 	pthread_cond_init(&(tt->cond), NULL);
337 	pthread_mutex_init(&(tt->lock), NULL);
338 	pthread_mutex_lock(&(tt->lock));
339 	TAILQ_INSERT_TAIL(&task_threads, tt, list);
340 
341 	if (pthread_create(&(tt->tid), NULL, task_thread, tt) != 0)
342 		fatal("pthread_create: %s", errno_s);
343 
344 	*out = tt;
345 }
346 
347 static void *
task_thread(void * arg)348 task_thread(void *arg)
349 {
350 	struct kore_task		*t;
351 	struct kore_task_thread		*tt = arg;
352 
353 	kore_debug("task_thread: #%d starting", tt->idx);
354 
355 	pthread_mutex_lock(&(tt->lock));
356 
357 	for (;;) {
358 		if (TAILQ_EMPTY(&(tt->tasks)))
359 			pthread_cond_wait(&(tt->cond), &(tt->lock));
360 
361 		kore_debug("task_thread#%d: woke up", tt->idx);
362 
363 		t = TAILQ_FIRST(&(tt->tasks));
364 		TAILQ_REMOVE(&(tt->tasks), t, list);
365 		pthread_mutex_unlock(&(tt->lock));
366 
367 		kore_debug("task_thread#%d: executing %p", tt->idx, t);
368 
369 		kore_task_set_state(t, KORE_TASK_STATE_RUNNING);
370 		kore_task_set_result(t, t->entry(t));
371 		kore_task_finish(t);
372 
373 		pthread_mutex_lock(&(tt->lock));
374 	}
375 
376 	pthread_exit(NULL);
377 
378 	/* NOTREACHED */
379 	return (NULL);
380 }
381