1 /* PEAK Library
2 *
3 * Copyright (c) 2003, 2004
4 * Stephane Thiell <mbuna@bugged.org>. All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 *
10 * 1. Redistributions of source code must retain the above copyright
11 * notice, this list of conditions and the following disclaimer.
12 *
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in the
15 * documentation and/or other materials provided with the distribution.
16 *
17 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18 * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED
19 * TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
20 * PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
21 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
22 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
23 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS;
24 * OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY,
25 * WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR
26 * OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
27 * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28 *
29 */
30 #define RCSID "$Id: task.c,v 1.10 2005/02/02 10:07:20 mbuna Exp $"
31
32 #ifdef HAVE_CONFIG_H
33 #include "config.h"
34 #endif
35
36 #include <peak/task.h>
37 #include <peak/time.h>
38
39 #include "task_class.h"
40 #include "task_private.h"
41 #include "task_runloop.h"
42 #include "timer_tree.h"
43 #include "init_private.h"
44 #include "utilities.h"
45
46 #include <assert.h>
47 #include <stdio.h>
48 #include <stdlib.h>
49 #include <pthread.h>
50
51
52 extern int _peak_is_threaded;
53
54
55 static void __peak_task_init(peak_task task, va_list vp, void *ctcx);
56 static void __peak_task_finalize(peak_task task);
57
58 static void __peak_task_autospawn(peak_task task);
59 static void __peak_task_respawn(peak_task task, int n);
60 static void __peak_task_spawn(peak_task task, int n);
61
62 static void __peak_task_timer_insert(peak_task task, peak_timer ti);
63 static void __peak_task_timer_remove(peak_task task, peak_timer ti);
64 static int __peak_task_timer_prefetch(double now, peak_timer n,
65 peak_timer *tp, int idx);
66 static void __peak_task_op_timer_schedule_fire(peak_task task, peak_timer ti);
67
68
69
70
71 PEAK_CLASS_BASE_DECLARE(task);
72
73
74 __private_extern__ peak_task
_peak_task_create()75 _peak_task_create()
76 {
77 return PEAK_CLASS_CONSTRUCT0(task);
78 }
79
80 static void
__peak_task_init(peak_task task,va_list vp,void * ctcx)81 __peak_task_init(peak_task task, va_list vp, void *ctcx)
82 {
83 task->_pool = peak_task_runloop_stackpool_create();
84 task->_hdlsem = peak_semaphore_create(0);
85 task->_runsem = peak_semaphore_create(0);
86 task->_exsem = peak_semaphore_create(0);
87 task->_excosem = peak_semaphore_create(0);
88 task->_exlock = PEAK_SPINLOCK_INITIALIZER;
89 task->_lock = PEAK_SPINLOCK_INITIALIZER;
90
91 #ifdef PEAK_SIMULATED_NCPUS
92 task->_ncpus = PEAK_SIMULATED_NCPUS;
93 #else
94 if ((task->_ncpus = peak_get_ncpus()) < 1)
95 task->_ncpus = 1;
96 #endif
97
98 task->_nevts = 0;
99 peak_atomic_set(&task->_nruns, 0);
100 peak_atomic_set(&task->_nexs, 0);
101
102 /* Create underlying task's engine. */
103 task->_engine = _peak_engine_create(task);
104
105 /* Initialize timer splay tree. */
106 task->_tifirst = NULL;
107 task->_tiroot = NULL;
108
109 task->_flags = TASK_FLAG_DEFAULT;
110
111 /* "Built-in" master runloop. */
112 task->_master = _peak_task_runloop_create(task);
113 peak_task_runloop_stackpool_push(task->_pool, task->_master);
114 task->_nthreads = -1; /* Special value: autospawn wanted */
115
116 _peak_init_thread_task(task);
117 }
118
119 static void
__peak_task_finalize(peak_task task)120 __peak_task_finalize(peak_task task)
121 {
122 PEAK_HALT; /* shouldn't happen *for now* */
123 peak_release(task->_engine);
124 peak_release(task->_excosem);
125 peak_release(task->_exsem);
126 peak_release(task->_runsem);
127 peak_release(task->_hdlsem);
128 peak_release(task->_pool);
129 }
130
131 static inline void
__peak_task_lock(peak_task task)132 __peak_task_lock(peak_task task)
133 {
134 _peak_spinlock_lock(&task->_lock);
135 }
136
137 static inline void
__peak_task_unlock(peak_task task)138 __peak_task_unlock(peak_task task)
139 {
140 _peak_spinlock_unlock(&task->_lock);
141 }
142
143 int
peak_task_get_info(peak_task task,peak_task_flavor_t flavor,int * info)144 peak_task_get_info(peak_task task, peak_task_flavor_t flavor, int *info)
145 {
146 peak_task_exclusivity(); /* Way to dangerous otherwise... */
147
148 switch (flavor)
149 {
150 case PEAK_TASK_FLAVOR_NTHREADS:
151 info[0] = task->_nthreads == -1 ? task->_ncpus : task->_nthreads;
152 break;
153 case PEAK_TASK_FLAVOR_MAXFDS:
154 info[0] = _peak_engine_get_maxfds(task->_engine);
155 break;
156 default:
157 return -1;
158 }
159 return 0;
160 }
161
162 int
peak_task_set_info(peak_task task,peak_task_flavor_t flavor,int * info)163 peak_task_set_info(peak_task task, peak_task_flavor_t flavor, int *info)
164 {
165 peak_task_exclusivity(); /* Way to dangerous otherwise... */
166
167 switch (flavor)
168 {
169 case PEAK_TASK_FLAVOR_NTHREADS: /* Configure task's threads number */
170 if (info[0] == 0) /* 0 means auto configuration */
171 {
172 __peak_task_autospawn(task);
173 return 0;
174 }
175 else if (info[0] > 0)
176 {
177 __peak_task_respawn(task, info[0]);
178 return (task->_nthreads == info[0]) ? 0 : -1;
179 }
180 break;
181
182 case PEAK_TASK_FLAVOR_MAXFDS: /* Configure max number of open files */
183 if (info[0] > 0)
184 return _peak_engine_set_maxfds(task->_engine, info[0]);
185 break;
186
187 default:
188 break;
189 }
190 return -1;
191 }
192
193 const char *
peak_task_get_engine_name(peak_task task)194 peak_task_get_engine_name(peak_task task)
195 {
196 return _peak_engine_get_name(task->_engine);
197 }
198
199 void
peak_task_run(peak_task task)200 peak_task_run(peak_task task)
201 {
202 if (task->_nthreads == -1) /* Special autospawn default value */
203 {
204 task->_nthreads = 1; /* Only the master thread has been created */
205 __peak_task_autospawn(task);
206 }
207 TASK_SET_RUNNING(task);
208 _peak_engine_loop(task->_engine);
209 }
210
211 void
peak_task_break(peak_task task)212 peak_task_break(peak_task task)
213 {
214 _peak_engine_break(task->_engine);
215 TASK_SET_STOPPED(task);
216 }
217
218 static void
__peak_task_autospawn(peak_task task)219 __peak_task_autospawn(peak_task task)
220 {
221 if (task->_nthreads == -1) /* Special autospawn default value */
222 task->_nthreads = 1; /* Only the master thread has been created */
223
224 /* In most applications, the best we have is number of threads == ncpus */
225 __peak_task_respawn(task, task->_ncpus);
226 }
227
228 static void
__peak_task_respawn(peak_task task,int n)229 __peak_task_respawn(peak_task task, int n)
230 {
231 if (task->_nthreads == -1) /* Special autospawn default value */
232 task->_nthreads = 1; /* Only the master thread has been created */
233
234 if (task->_nthreads == n)
235 return; /* alright */
236
237 if (task->_nthreads < n)
238 {
239 /* Spawn missing threads */
240 __peak_task_spawn(task, n - task->_nthreads);
241 }
242 else
243 {
244 /* If too many threads, make some quit... */
245 do
246 {
247 _peak_task_runloop_quit(peak_task_runloop_stackpool_top(task->_pool));
248 peak_task_runloop_stackpool_pop(task->_pool);
249 }
250 while (task->_nthreads != n);
251 }
252 }
253
254 static void
__peak_task_spawn(peak_task task,int n)255 __peak_task_spawn(peak_task task, int n)
256 {
257 while (n--)
258 {
259 pthread_t t;
260
261 if (pthread_create(&t, NULL, _peak_task_runloop_start, task) == -1)
262 {
263 PEAK_WARN("pthread_create failed; cannot spawn as desired");
264 break;
265 }
266 peak_semaphore_wait(task->_hdlsem);
267 }
268 task->_nthreads = peak_task_runloop_stackpool_count(task->_pool);
269 if (task->_nthreads > 1 && !_peak_is_threaded)
270 _peak_is_threaded = 1;
271 }
272
273
274 /* Thread's synchronization: Acquire exclusive task execution.
275 * If you try to understand, see along with:
276 * _peak_task_process_pending_events()
277 * __peak_task_runloop_run().
278 */
279 void
peak_task_exclusivity()280 peak_task_exclusivity()
281 {
282 peak_task task;
283 peak_task_runloop rl;
284 int alone;
285
286 if (!_peak_is_threaded)
287 return;
288
289 task = peak_task_self();
290 #if 0
291 /* Enable this when peak supports multiple tasks */
292 if (task->_nthreads <= 1)
293 return;
294 #endif
295
296 /* If no or only one event being processed, don't bother :) */
297 if (task->_nevts <= 1)
298 return;
299
300 /* Multiple events, multiple threads configuration */
301 if ((rl = _peak_task_runloop_self()) == NULL)
302 PEAK_HALT;
303
304 _peak_spinlock_lock(&task->_exlock);
305 peak_atomic_inc(&task->_nexs); /* Increment exclusive count */
306 alone = (peak_atomic_read(&task->_nruns) == 1); /* Do the test */
307 _peak_spinlock_unlock(&task->_exlock); /* Test done, unlock */
308 if (alone)
309 peak_atomic_dec(&task->_nexs); /* We are the only one running! */
310 else
311 peak_semaphore_wait(task->_exsem); /* Wait for exclusivity. */
312
313 /* EXCLUSIVE EXECUTION */
314
315 /* Set runloop exclusivity bit. */
316 rl->_exclusivity = 1;
317 }
318
319 __private_extern__ void
_peak_task_schedule_engine_client(peak_task task,peak_engine_client c)320 _peak_task_schedule_engine_client(peak_task task, peak_engine_client c)
321 {
322 if (c->_task != NULL)
323 PEAK_HALT;
324
325 _peak_engine_add_client(task->_engine, c);
326
327 c->_task = task;
328 }
329
330 __private_extern__ void
_peak_task_unschedule_engine_client(peak_task task,peak_engine_client c)331 _peak_task_unschedule_engine_client(peak_task task, peak_engine_client c)
332 {
333 if (c->_task == NULL)
334 PEAK_HALT;
335
336 _peak_engine_remove_client(task->_engine, c);
337
338 c->_task = NULL;
339 }
340
341 void
peak_task_timer_add(peak_task task,peak_timer ti)342 peak_task_timer_add(peak_task task, peak_timer ti)
343 {
344 __peak_task_lock(task);
345
346 if (ti->_task)
347 {
348 assert(ti->_task == task);
349 __peak_task_timer_remove(ti->_task, ti);
350 }
351 else
352 peak_retain(ti);
353 ti->_task = task;
354 __peak_task_timer_insert(task, ti);
355
356 __peak_task_unlock(task);
357 }
358
359 void
peak_task_timer_remove(peak_task task,peak_timer ti)360 peak_task_timer_remove(peak_task task, peak_timer ti)
361 {
362 __peak_task_lock(task);
363
364 if (ti->_task == task)
365 {
366 __peak_task_timer_remove(task, ti);
367 ti->_task = NULL;
368 peak_release(ti);
369 }
370
371 __peak_task_unlock(task);
372 }
373
374 __private_extern__ void
_peak_task_timer_lock_configure(peak_task task,peak_timer ti,double fire,double interval)375 _peak_task_timer_lock_configure(peak_task task, peak_timer ti,
376 double fire, double interval)
377 {
378 if (task)
379 {
380 __peak_task_lock(task);
381
382 /* Be sure to remove it from the pending timers list first. */
383 __peak_task_timer_remove(task, ti);
384 }
385
386 /* Configure timer */
387 _peak_timer_configure(ti, fire, interval);
388
389 /* Apply necessary timer's requeueing if the timer t is associated
390 * with a task.
391 */
392 if (task)
393 {
394 __peak_task_timer_insert(task, ti);
395 __peak_task_unlock(task);
396 }
397 }
398
399 static void
__peak_task_timer_insert(peak_task task,peak_timer ti)400 __peak_task_timer_insert(peak_task task, peak_timer ti)
401 {
402 assert(ti->left == NULL && ti->right == NULL);
403
404 if (task->_tiroot)
405 {
406 int cmp;
407
408 /* Splay tree around timer. */
409 _peak_timer_tree_splay(ti, &task->_tiroot, NULL, NULL);
410
411 cmp = PEAK_TIMER_COMPARE(task->_tiroot, ti);
412
413 /* Insert new timer at the root of the tree. */
414 if (cmp > 0)
415 {
416 ti->right = task->_tiroot;
417 ti->left = ti->right->left;
418 ti->right->left = NULL;
419 }
420 else
421 {
422 ti->left = task->_tiroot;
423 ti->right = ti->left->right;
424 ti->left->right = NULL;
425 }
426
427 /* Update our fast first timer accessor. */
428 if (!ti->left)
429 task->_tifirst = ti;
430 }
431 else
432 {
433 /* Tree was empty. */
434 ti->left = ti->right = NULL;
435 task->_tifirst = ti;
436 }
437
438 task->_tiroot = ti;
439 }
440
441 static void
__peak_task_timer_remove(peak_task task,peak_timer ti)442 __peak_task_timer_remove(peak_task task, peak_timer ti)
443 {
444 if (!task->_tiroot)
445 return; /* nothing to remove at all */
446
447 _peak_timer_tree_splay(ti, &task->_tiroot, NULL, NULL);
448
449 if (task->_tiroot != ti)
450 return; /* Ignore for convenience, if not found. */
451
452 /* Update tifirst accessor. */
453 if (ti == task->_tifirst)
454 {
455 peak_timer node;
456 assert(ti->left == NULL); /* We were the first, damn it. */
457
458 node = ti->right;
459 if (node)
460 {
461 /* Find the leftmost element of the right subtree. */
462 while (node->left)
463 node = node->left;
464 task->_tifirst = node;
465 }
466 else
467 {
468 assert(ti->left == NULL && ti->right == NULL);
469 task->_tifirst = NULL;
470 }
471 }
472
473 if (ti->left)
474 {
475 task->_tiroot = ti->left;
476
477 if (ti->right)
478 {
479 peak_timer left = ti->left;
480
481 /* Hang the old root right child off the right-most leaf of the
482 * left child.
483 */
484 while (left->right)
485 left = left->right;
486 left->right = ti->right;
487 }
488 }
489 else
490 task->_tiroot = ti->right;
491
492 ti->left = NULL;
493 ti->right = NULL;
494 }
495
496 __private_extern__ int
_peak_task_time(time_t * nowp)497 _peak_task_time(time_t *nowp)
498 {
499 peak_task task = peak_task_self();
500
501 /* Use current time (cached) of associated task, if possible
502 */
503 if (task && TASK_IS_ON_TIME(task))
504 {
505 *nowp = (time_t)task->_current_time;
506 return 1;
507 }
508 return 0;
509 }
510
511 __private_extern__ int
_peak_task_time_float(double * nowp)512 _peak_task_time_float(double *nowp)
513 {
514 peak_task task = peak_task_self();
515
516 /* Use current time (cached) of associated task, if possible
517 */
518 if (task && TASK_IS_ON_TIME(task))
519 {
520 *nowp = task->_current_time;
521 return 1;
522 }
523 return 0;
524 }
525
526 __private_extern__ void
_peak_task_set_on_time(double now)527 _peak_task_set_on_time(double now)
528 {
529 peak_task task = peak_task_self();
530 if (task && TASK_IS_RUNNING(task))
531 {
532 task->_current_time = now;
533 task->_flags |= TASK_FLAG_ON_TIME;
534 }
535 }
536
537 __private_extern__ int
_peak_task_timer_mswait(peak_task task)538 _peak_task_timer_mswait(peak_task task)
539 {
540 if (task->_tifirst == NULL)
541 return -1; /* INFTIM */
542
543 /* +1 for ceiling */
544 return (int)((_peak_timer_expire_relative(task->_tifirst) * 1000.0) + 1);
545 }
546
547 struct timeval*
_peak_task_timer_tvwait(peak_task task,struct timeval * tv)548 _peak_task_timer_tvwait(peak_task task, struct timeval *tv)
549 {
550 #if 1
551 return task->_tifirst
552 ? _peak_timer_expire_relative_tv(task->_tifirst, tv)
553 : NULL;
554 #else
555 struct timeval *itv =
556 (task->_tifirst) ? _peak_timer_expire_relative_tv(task->_tifirst, tv)
557 : NULL;
558
559 if (itv)
560 printf("_peak_task_timer_timewait: tv->tv_sec=%d tv->tv_usec=%d\n",
561 itv->tv_sec, itv->tv_usec);
562
563 return itv;
564 #endif
565 }
566
567 __private_extern__ struct timespec*
_peak_task_timer_tswait(peak_task task,struct timespec * ts)568 _peak_task_timer_tswait(peak_task task, struct timespec *ts)
569 {
570 #if 1
571 return task->_tifirst
572 ? _peak_timer_expire_relative_ts(task->_tifirst, ts)
573 : NULL;
574 #else
575 struct timespec *_ts;
576
577 if (task->_tifirst == NULL)
578 return NULL;
579
580 _ts = _peak_timer_expire_relative_ts(task->_tifirst, ts);
581
582 if (_ts)
583 printf("ts->ts_sec=%d ts->nsec=%d\n", _ts->tv_sec, _ts->tv_nsec);
584 return _ts;
585 #endif
586 }
587
588 /* Max number of timers that can be fetched/scheduled in one event loop.
589 */
590 #define TIMER_PREFETCH_MAX 100
591
592 /* Check every node below n following an in-order traversal. This function
593 * returns the number of timers fetched.
594 */
595 static int
__peak_task_timer_prefetch(double now,peak_timer n,peak_timer * tp,int idx)596 __peak_task_timer_prefetch(double now, peak_timer n, peak_timer *tp, int idx)
597 {
598 if (!n || idx >= TIMER_PREFETCH_MAX)
599 return idx;
600
601 idx = __peak_task_timer_prefetch(now, n->left, tp, idx);
602
603 if (n->_fire > now || idx >= TIMER_PREFETCH_MAX)
604 return idx;
605
606 tp[idx++] = n;
607
608 return __peak_task_timer_prefetch(now, n->right, tp, idx);
609 }
610
611 __private_extern__ int
_peak_task_timer_schedule_fire(peak_task task)612 _peak_task_timer_schedule_fire(peak_task task)
613 {
614 peak_timer ti, tistore[TIMER_PREFETCH_MAX];
615 double now;
616 int i, result;
617
618 /* Don't waste time if we don't have any scheduled timer. */
619 if (!task->_tifirst)
620 return 0;
621
622 /* Get time: this call should cache the current time "set_on_time". */
623 now = peak_time_float();
624
625 /* Splay the tree before to move the first item up (prefetching should be
626 * faster and the recursion depth reduced).
627 */
628 _peak_timer_tree_splay(task->_tifirst, &task->_tiroot, NULL, NULL);
629
630 /* Prefetch pending timers so that we can then modify the splay tree as
631 * our wish.
632 */
633 result = __peak_task_timer_prefetch(now, task->_tiroot, tistore, 0);
634
635 /* Process prefetched list. */
636 for (i = 0; i < result; i++)
637 {
638 ti = tistore[i];
639
640 /* Remove timer */
641 __peak_task_timer_remove(task, ti);
642
643 /* Queue operation */
644 __peak_task_op_timer_schedule_fire(task, ti);
645
646 /* Rearm timer, but not before now. */
647 _peak_timer_rearm(ti, now);
648
649 /* Re-insert if in requeue mode */
650 if (ti->_mode & PEAK_TIMER_MODE_REQUEUE)
651 __peak_task_timer_insert(task, ti);
652 }
653 return result;
654 }
655
656 __private_extern__ void
_peak_task_op_ioevent_schedule(peak_task task,peak_engine_client c,int event,int info)657 _peak_task_op_ioevent_schedule(peak_task task, peak_engine_client c,
658 int event, int info)
659 {
660 peak_task_runloop rl;
661 peak_task_op op;
662
663 rl = peak_task_runloop_stackpool_get(task->_pool);
664
665 if (rl == NULL)
666 PEAK_HALT;
667
668 TASK_OP_IOEVENT_GEN(op, rl, c, event, info);
669 assert(op != NULL);
670
671 _peak_task_runloop_op_schedule(rl, op);
672 }
673
674 static void
__peak_task_op_timer_schedule_fire(peak_task task,peak_timer ti)675 __peak_task_op_timer_schedule_fire(peak_task task, peak_timer ti)
676 {
677 peak_task_runloop rl;
678 peak_task_op op;
679
680 rl = peak_task_runloop_stackpool_get(task->_pool);
681
682 if (rl == NULL)
683 PEAK_HALT;
684
685 TASK_OP_TIMER_GEN(op, rl, ti);
686 assert(op != NULL);
687
688 _peak_task_runloop_op_schedule(rl, op);
689 }
690
691 __private_extern__ void
_peak_task_process_pending_events(peak_task task,int nevts)692 _peak_task_process_pending_events(peak_task task, int nevts)
693 {
694 int i;
695
696 if (nevts == 0)
697 {
698 PEAK_WARN("_peak_task_process_pending_events() called without event!");
699 task->_flags &= ~TASK_FLAG_ON_TIME;
700 return;
701 }
702
703 /* Optimization: bypass some checks elsewhere if only one event
704 * is being processed, even if we have several threads.
705 */
706 task->_nevts = nevts;
707
708 /* Optimization: don't broadcast for a single event as we are sure
709 * it's gonna be dealed by the master runloop.
710 */
711 if (nevts > 1)
712 {
713 /* Number of running threads we are going to use: all. */
714 peak_atomic_set(&task->_nruns, task->_nthreads);
715
716 /* Signal our worker threads
717 */
718 if (task->_nthreads == 2)
719 peak_semaphore_signal(task->_runsem);
720 else if (task->_nthreads > 2)
721 peak_semaphore_signal_all(task->_runsem);
722 }
723 else
724 {
725 /* Number of running threads we are going to use: one. */
726 peak_atomic_set(&task->_nruns, 1);
727 }
728
729 /* "Master thread" now acts like another runloop, and always runs. */
730 _peak_task_runloop_run(task->_master, 1);
731
732 if (nevts > 1)
733 {
734 /* Wait for completion. */
735 for (i = task->_nthreads - 1; i > 0; i--)
736 peak_semaphore_wait(task->_hdlsem);
737 }
738 /* All done. */
739 peak_atomic_set(&task->_nruns, 0);
740 peak_task_runloop_stackpool_reset(task->_pool);
741 task->_nevts = 0;
742
743 /* Consider task not on time anymore */
744 if (task->_flags & TASK_FLAG_ON_TIME)
745 task->_flags &= ~TASK_FLAG_ON_TIME;
746 }
747