1 /* Copyright (c) 2012, 2021, Oracle and/or its affiliates.
2
3 This program is free software; you can redistribute it and/or modify
4 it under the terms of the GNU General Public License, version 2.0,
5 as published by the Free Software Foundation.
6
7 This program is also distributed with certain software (including
8 but not limited to OpenSSL) that is licensed under separate terms,
9 as designated in a particular file or component or in included license
10 documentation. The authors of MySQL hereby grant you an additional
11 permission to link the program and your derivative works with the
12 separately licensed software that they have included with MySQL.
13
14 This program is distributed in the hope that it will be useful,
15 but WITHOUT ANY WARRANTY; without even the implied warranty of
16 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 GNU General Public License, version 2.0, for more details.
18
19 You should have received a copy of the GNU General Public License
20 along with this program; if not, write to the Free Software
21 Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */
22
23 /**
24 @file task.c
25 Rudimentary, non-preemptive task system in portable C,
26 based on Tom Duff's switch-based coroutine trick
27 and a stack of environment structs. (continuations?)
28 Nonblocking IO and event handling need to be rewritten for each new OS.
29 The code is not MT-safe, but could be made safe by moving all global
30 variables into a context struct which could be the first parameter
31 to all the functions.
32 */
33 #if defined(linux) && !defined(_XOPEN_SOURCE)
34 #define _XOPEN_SOURCE
35 #endif
36
37 #if defined(linux) && !defined(_GNU_SOURCE)
38 #define _GNU_SOURCE
39 #endif
40
41 #include "x_platform.h"
42
43 #ifdef XCOM_HAVE_OPENSSL
44 #include "openssl/ssl.h"
45 #include "openssl/err.h"
46 #endif
47
48 #include <limits.h>
49 #include <stdlib.h>
50 #include "xcom_vp.h"
51 #include "node_connection.h"
52
53 #ifndef WIN
54 #include <arpa/inet.h>
55 #endif
56
57 #include <sys/types.h>
58 #include <sys/time.h>
59 #include <stdio.h>
60 #include <assert.h>
61 #include <errno.h>
62 #include <string.h>
63 #include <fcntl.h>
64
65 #include "task_debug.h"
66 #include "task_net.h"
67 #include "simset.h"
68 #include "task_os.h"
69 #include "task.h"
70 #include "xcom_cfg.h"
71 #ifndef _WIN32
72 #include <sys/poll.h>
73 #endif
74
75 #include "retry.h"
76 #include "xdr_utils.h"
77
78 extern char *pax_op_to_str(int x);
79
80 task_arg null_arg = {a_end,{0}};
81
82 struct iotasks ;
83 typedef struct iotasks iotasks;
84
85 typedef struct {
86 u_int pollfd_array_len;
87 pollfd *pollfd_array_val;
88 } pollfd_array;
89
90 typedef task_env* task_env_p;
91
92 typedef struct {
93 u_int task_env_p_array_len;
94 task_env_p *task_env_p_array_val;
95 } task_env_p_array;
96
97 init_xdr_array(pollfd)
98 free_xdr_array(pollfd)
99 set_xdr_array(pollfd)
100 get_xdr_array(pollfd)
101 init_xdr_array(task_env_p)
102 free_xdr_array(task_env_p)
103 set_xdr_array(task_env_p)
104 get_xdr_array(task_env_p)
105
106 struct iotasks {
107 int nwait;
108 pollfd_array fd;
109 task_env_p_array tasks;
110 };
111
112 int task_errno = 0;
113 static task_env *extract_first_delayed();
114 static task_env *task_ref(task_env *t);
115 static task_env *task_unref(task_env *t);
116 static void wake_all_io();
117 static void task_sys_deinit();
118
119 /* Return time as seconds */
120 static double _now = 0.0;
121
task_now()122 double task_now()
123 {
124 return _now;
125 }
126
127
128 #ifdef WIN
gettimeofday(struct timeval * tp,struct timezone * tzp)129 int gettimeofday(struct timeval * tp, struct timezone * tzp)
130 {
131 static uint64_t const EPOCH = ((uint64_t) 116444736000000000ULL);
132
133 SYSTEMTIME system_time;
134 FILETIME file_time;
135 uint64_t time;
136
137 GetSystemTime( &system_time );
138 SystemTimeToFileTime( &system_time, &file_time );
139 time = ((uint64_t)file_time.dwLowDateTime ) ;
140 time += ((uint64_t)file_time.dwHighDateTime) << 32;
141
142 tp->tv_sec = (long) ((time - EPOCH) / 10000000L);
143 tp->tv_usec = (long) (system_time.wMilliseconds * 1000);
144 return 0;
145 }
146 #endif
147
seconds(void)148 double seconds(void)
149 {
150 struct timeval tv;
151 if (gettimeofday(&tv, 0) < 0)
152 return - 1.0;
153 return _now = (double)tv.tv_sec + (double)tv.tv_usec / 1000000.0;
154 }
155
156
157 #ifdef NOTDEF
task_queue_init(task_queue * q)158 static void task_queue_init(task_queue *q)
159 {
160 q->curn = 0;
161 }
162
task_queue_debug(task_queue * q)163 static void task_queue_debug(task_queue *q)
164 {
165 int i;
166 GET_GOUT;
167 STRLIT("task_queue_debug ");
168 for (i = 1; i <= q->curn; i++) {
169 NDBG(i, d);
170 PTREXP(q->x[i]);
171 STREXP(q->x[i]->name);
172 NDBG(q->x[i]->heap_pos, d);
173 NDBG(q->x[i]->terminate, d);
174 NDBG(q->x[i]->time, f);
175 }
176 PRINT_GOUT;
177 FREE_GOUT;
178 }
179
180
is_heap(task_queue * q)181 static int is_heap(task_queue *q)
182 {
183 if (q->curn) {
184 int i;
185 for (i = q->curn; i > 1; i--) {
186 if ((q->x[i]->time < q->x[i/2]->time) ||
187 (q->x[i]->heap_pos != i)) {
188 task_queue_debug(q);
189 return 0;
190 }
191 }
192 if (q->x[1]->heap_pos != 1) {
193 task_queue_debug(q);
194 return 0;
195 }
196 }
197 return 1;
198 }
199
task_queue_full(task_queue * q)200 static int task_queue_full(task_queue *q)
201 {
202 /* assert(is_heap(q)); */
203 return q->curn >= MAXTASKS;
204 }
205
206
207 #endif
208
209 #define FIX_POS(i) q->x[i]->heap_pos = (i)
210 /* #define TASK_SWAP(x,y) { task_env *tmp = (x); (x) = (y); (y) = (tmp); } */
211 #define TASK_SWAP(i,j) { task_env *tmp = q->x[i]; q->x[i] = q->x[j]; q->x[j] = tmp; FIX_POS(i); FIX_POS(j); }
212 #define TASK_MOVE(i,j) { q->x[i] = q->x[j]; FIX_POS(i); }
213 /* Put the task_env* at index n in its right place when Heap(1,n-1) */
task_queue_siftup(task_queue * q,int n)214 static void task_queue_siftup(task_queue *q, int n)
215 {
216 int i = n;
217 int p;
218 assert(n >= 0);
219 /* Heap(1,n-1) */
220 for (; ; ) {
221 if (i == 1)
222 break; /* Reached root */
223 p = i / 2; /* Find parent */
224 if (q->x[p]->time <= q->x[i]->time)
225 break; /* We have reached correct place */
226 TASK_SWAP(p, i);
227 i = p;
228 }
229 /* Heap(1,n) */
230 }
231
232
233 /* Put the task_env* at index l in its right place when Heap(l+1,n) */
task_queue_siftdown(task_queue * q,int l,int n)234 static void task_queue_siftdown(task_queue *q, int l, int n)
235 {
236 int i = l;
237 int c;
238 assert(n >= 0);
239 /* Heap(l+1,,n) */
240 for (; ; ) {
241 c = 2 * i; /* First child */
242 if (c > n)
243 break; /* Outside heap */
244 if (c + 1 <= n) /* We have second child */
245 if (q->x[c+1]->time < q->x[c]->time) /* Select lesser child */
246 c++;
247 if (q->x[i]->time <= q->x[c]->time)
248 break; /* We have reached correct place */
249 TASK_SWAP(c, i);
250 i = c;
251 }
252 /* Heap(l,n) */
253 }
254
255
256 /* Remove any element from the heap */
task_queue_remove(task_queue * q,int i)257 static task_env *task_queue_remove(task_queue *q, int i)
258 {
259 task_env * tmp = q->x[i]; /* Will return this */
260 assert(q->curn);
261 /* assert(is_heap(q)); */
262 MAY_DBG(FN; STRLIT("task_queue_remove "); NDBG(i, d));
263 /* task_queue_debug(q); */
264 TASK_MOVE(i, q->curn); /* Fill the hole */
265 q->curn--; /* Heap is now smaller */
266 /* Re-establish heap property */
267 if (q->curn) {
268 int p = i / 2;
269 if (p && q->x[p]->time > q->x[i]->time)
270 task_queue_siftup(q, i);
271 else
272 task_queue_siftdown(q, i, q->curn);
273 }
274 /* task_queue_debug(q); */
275 /* assert(is_heap(q)); */
276 tmp->heap_pos = 0;
277 return task_unref(tmp);
278 }
279
280
281 /* Insert task_env * in queue */
task_queue_insert(task_queue * q,task_env * t)282 static void task_queue_insert(task_queue *q, task_env *t)
283 {
284 assert(t->heap_pos == 0);
285 assert(q->curn < MAXTASKS);
286 /* assert(is_heap(q)); */
287 q->curn++;
288 q->x[q->curn] = t;
289 FIX_POS(q->curn);
290 /* Heap(1,n-1) */
291 task_queue_siftup(q, q->curn);
292 /* Heap(1,n) */
293 /* assert(is_heap(q)); */
294 }
295
296
task_queue_empty(task_queue * q)297 static int task_queue_empty(task_queue *q)
298 {
299 /* assert(is_heap(q)); */
300 return q->curn < 1;
301 }
302
303
task_queue_min(task_queue * q)304 static task_env *task_queue_min(task_queue *q)
305 {
306 /* assert(is_heap(q)); */
307 assert(q->curn >= 1);
308 return q->x[1];
309 }
310
311
312 /* Extract first task_env * from queue */
task_queue_extractmin(task_queue * q)313 static task_env *task_queue_extractmin(task_queue *q)
314 {
315 task_env * tmp;
316 assert(q);
317 assert(q->curn >= 1);
318 /* assert(is_heap(q)); */
319 /* task_queue_debug(q); */
320 tmp = q->x[1];
321 TASK_MOVE(1, q->curn);
322 q->x[q->curn] = 0;
323 q->curn--;
324 /* Heap(2,n) */
325 if (q->curn)
326 task_queue_siftdown(q, 1, q->curn);
327 /* Heap(1,n) */
328 /* task_queue_debug(q); */
329 /* assert(is_heap(q)); */
330 tmp->heap_pos = 0;
331 return tmp;
332 }
333
334
335 static linkage ash_nazg_gimbatul = {0,&ash_nazg_gimbatul,&ash_nazg_gimbatul}; /* One ring to bind them all */
336
337 static void task_init(task_env *p);
338 void *task_allocate(task_env *p, unsigned int bytes);
339 /**
340 Initialize task memory
341 */
task_init(task_env * t)342 static void task_init(task_env *t)
343 {
344 link_init(&t->l, type_hash("task_env"));
345 link_init(&t->all, type_hash("task_env"));
346 t->heap_pos = 0;
347 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
348 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
349 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
350 /* #ifdef __sun */
351 /* mem_watch(&ash_nazg_gimbatul,sizeof(&ash_nazg_gimbatul), 0); */
352 /* #endif */
353 link_into(&t->all, &ash_nazg_gimbatul); /* Put it in the list of all tasks */
354 /* #ifdef __sun */
355 /* mem_watch(&ash_nazg_gimbatul,sizeof(&ash_nazg_gimbatul), WA_WRITE); */
356 /* #endif */
357 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
358 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
359 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
360 t->terminate = RUN;
361 t->refcnt = 0;
362 t->taskret = 0;
363 t->time = 0.0;
364 t->arg = null_arg;
365 t->where = t->buf;
366 t->stack_top = &t->buf[TASK_POOL_ELEMS-1];
367 t->sp = t->stack_top;
368 memset(t->buf, 0, TASK_POOL_ELEMS * sizeof(TaskAlign));
369 }
370
371
372 static linkage tasks = {0,&tasks,&tasks};
373 static task_queue task_time_q;
374 static linkage free_tasks = {0,&free_tasks,&free_tasks};
375 /* Basic operations on tasks */
activate(task_env * t)376 static task_env *activate(task_env *t)
377 {
378 if (t) {
379 MAY_DBG(FN;
380 STRLIT("activating task ");
381 PTREXP(t);
382 STREXP(t->name);
383 NDBG(t->heap_pos, d);
384 NDBG(t->time, f);
385 );
386 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
387 if (t->heap_pos)
388 task_queue_remove(&task_time_q, t->heap_pos);
389 link_into(&t->l, &tasks);
390 t->time = 0.0;
391 t->heap_pos = 0;
392 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
393 }
394 return t;
395 }
396
397
deactivate(task_env * t)398 static task_env *deactivate(task_env *t)
399 {
400 if (t) {
401 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
402 link_out(&t->l);
403 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
404 }
405 return t;
406 }
407
408
task_delay_until(double time)409 void task_delay_until(double time)
410 {
411 if (stack) {
412 stack->time = time;
413 task_queue_insert(&task_time_q, task_ref(deactivate(stack)));
414 }
415 }
416
417
418 /* Wait queues */
task_wait(task_env * t,linkage * queue)419 void task_wait(task_env *t, linkage *queue)
420 {
421 if (t) {
422 TASK_DEBUG("task_wait");
423 deactivate(t);
424 link_into(&t->l, queue);
425 }
426 }
427
428
task_wakeup(linkage * queue)429 void task_wakeup(linkage *queue)
430 {
431 assert(queue);
432 assert(queue != &tasks);
433 while (!link_empty(queue)) {
434 activate(container_of(link_extract_first(queue), task_env, l));
435 TASK_DEBUG("task_wakeup");
436 }
437 }
438
439
task_wakeup_first(linkage * queue)440 static void task_wakeup_first(linkage *queue)
441 {
442 assert(queue);
443 assert(queue != &tasks);
444 if (!link_empty(queue)) {
445 activate(container_of(link_extract_first(queue), task_env, l));
446 TASK_DEBUG("task_wakeup_first");
447 }
448 }
449
450
451 /* Channels */
channel_init(channel * c,unsigned int type)452 channel *channel_init(channel *c, unsigned int type)
453 {
454 link_init(&c->data, type);
455 link_init(&c->queue, type_hash("task_env"));
456 return c;
457 }
458
459 /* purecov: begin deadcode */
channel_new()460 channel *channel_new()
461 {
462 channel * c = malloc(sizeof(channel));
463 channel_init(c, NULL_TYPE);
464 return c;
465 }
466 /* purecov: end */
467
channel_put(channel * c,linkage * data)468 void channel_put(channel *c, linkage *data)
469 {
470 MAY_DBG(FN; PTREXP(data); PTREXP(&c->data));
471 link_into(data, &c->data);
472 task_wakeup_first(&c->queue);
473 }
474
475
channel_put_front(channel * c,linkage * data)476 void channel_put_front(channel *c, linkage *data)
477 {
478 link_follow(data, &c->data);
479 task_wakeup_first(&c->queue);
480 }
481
482
483 static int active_tasks = 0;
task_new(task_func func,task_arg arg,const char * name,int debug)484 task_env *task_new(task_func func, task_arg arg, const char *name, int debug)
485 {
486 task_env * t;
487 if (link_empty(&free_tasks))
488 t = malloc(sizeof(task_env));
489 else
490 t = container_of(link_extract_first(&free_tasks), task_env, l);
491 DBGOUT(FN; PTREXP(t); STREXP(name));
492 task_init(t);
493 t->func = func;
494 t->arg = arg;
495 t->name = name;
496 t->debug = debug;
497 t->waitfd = -1;
498 t->interrupt = 0;
499 activate(t);
500 task_ref(t);
501 active_tasks++;
502 return t;
503 }
504
505
506 /**Allocate bytes from pool, initialized to zero */
task_allocate(task_env * p,unsigned int bytes)507 void *task_allocate(task_env *p, unsigned int bytes)
508 {
509 /* TaskAlign to boundary */
510 unsigned int alloc_units = (unsigned int)
511 ((bytes + sizeof(TaskAlign) -1) / sizeof(TaskAlign));
512 TaskAlign * ret;
513 /* Check if there is space */
514 TASK_DEBUG("task_allocate");
515 if ((p->where + alloc_units) <= (p->stack_top)) {
516 ret = p->where;
517 p->where += alloc_units;
518 memset(ret, 0, alloc_units * sizeof(TaskAlign));
519 } else {
520 ret = 0;
521 abort();
522 }
523 return ret;
524 }
525
526
reset_state(task_env * p)527 void reset_state(task_env *p)
528 {
529 if ((p->where) <= (p->stack_top - 1)) {
530 TASK_DEBUG("reset_state");
531 p->stack_top[-1].state = 0;
532 } else {
533 abort();
534 }
535 }
536
537
pushp(task_env * p,void * ptr)538 void pushp(task_env *p, void *ptr)
539 {
540 assert(ptr);
541 if ((p->where) <= (p->stack_top - 1)) {
542 p->stack_top->ptr = ptr;
543 p->stack_top--;
544 TASK_DEBUG("pushp");
545 } else {
546 abort();
547 }
548 }
549
550
popp(task_env * p)551 void popp(task_env *p)
552 {
553 if (p->stack_top < &p->buf[TASK_POOL_ELEMS]) {
554 TASK_DEBUG("popp");
555 p->stack_top++;
556 } else {
557 abort();
558 }
559 }
560
561
runnable_tasks()562 static int runnable_tasks()
563 {
564 return !link_empty(&tasks);
565 }
566
567
delayed_tasks()568 static int delayed_tasks()
569 {
570 return !task_queue_empty(&task_time_q);
571 }
572
573
task_delete(task_env * t)574 static void task_delete(task_env *t)
575 {
576 DBGOUT(FN; PTREXP(t); STREXP(t->name); NDBG(t->refcnt, d));
577 link_out(&t->all); /* Remove task from list of all tasks */
578 #if 1
579 free(deactivate(t)); /* Deactivate and free task */
580 #else
581 deactivate(t);
582 link_into(&t->l, &free_tasks);
583 #endif
584 active_tasks--;
585 }
586
587
task_ref(task_env * t)588 static task_env *task_ref(task_env *t)
589 {
590 if (t) {
591 t->refcnt++;
592 }
593 return t;
594 }
595
596
task_unref(task_env * t)597 static task_env *task_unref(task_env *t)
598 {
599 if (t) {
600 t->refcnt--;
601 if (t->refcnt == 0) {
602 task_delete(t);
603 return NULL;
604 }
605 }
606 return t;
607 }
608
609
task_activate(task_env * t)610 task_env *task_activate(task_env *t)
611 {
612 return activate(t);
613 }
614
615
task_deactivate(task_env * t)616 task_env *task_deactivate(task_env *t)
617 {
618 return deactivate(t);
619 }
620
621
622 /* Set terminate flag and activate task */
task_terminate(task_env * t)623 task_env *task_terminate(task_env *t) {
624 if (t) {
625 DBGOUT(FN; PTREXP(t); STREXP(t->name); NDBG(t->refcnt, d));
626 t->terminate = KILL; /* Set terminate flag */
627 activate(t); /* and get it running */
628 }
629 return t;
630 }
631
632
633 /* Call task_terminate on all tasks */
task_terminate_all()634 void task_terminate_all()
635 {
636 /* First, activate all tasks which wait for timeout */
637 while (delayed_tasks()) {
638 task_env * t = extract_first_delayed(); /* May be NULL */
639 if (t)
640 activate(t); /* Make it runnable */
641 }
642 /* Then wake all tasks waiting for IO */
643 wake_all_io();
644 /* At last, terminate everything */
645 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
646 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
647 FWD_ITER(&ash_nazg_gimbatul, task_env,
648 task_terminate(container_of(link_iter, task_env, all));
649 );
650 }
651
652
first_delayed()653 static task_env *first_delayed()
654 {
655 return task_queue_min(&task_time_q);
656 }
657
658
extract_first_delayed()659 static task_env *extract_first_delayed()
660 {
661 task_env * ret = task_queue_extractmin(&task_time_q);
662 ret->time = 0.0;
663 return task_unref(ret);
664 }
665
666
667 static iotasks iot;
668
iotasks_init(iotasks * iot)669 static void iotasks_init(iotasks *iot)
670 {
671 DBGOUT(FN);
672 iot->nwait = 0;
673 init_pollfd_array(&iot->fd);
674 init_task_env_p_array(&iot->tasks);
675 }
676
iotasks_deinit(iotasks * iot)677 static void iotasks_deinit(iotasks *iot)
678 {
679 DBGOUT(FN);
680 iot->nwait = 0;
681 free_pollfd_array(&iot->fd);
682 free_task_env_p_array(&iot->tasks);
683 }
684
685 #if TASK_DBUG_ON
poll_debug()686 static void poll_debug()
687 {
688 int i = 0;
689 for (i = 0; i < iot.nwait; i++) {
690 NDBG(i,d); PTREXP(iot.tasks[i]); NDBG(iot.fd[i].fd,d); NDBG(iot.fd[i].events,d); NDBG(iot.fd[i].revents,d);
691 }
692 }
693 #endif
694
poll_wakeup(int i)695 static void poll_wakeup(int i)
696 {
697 activate(task_unref(get_task_env_p(&iot.tasks,i)));
698 set_task_env_p(&iot.tasks, NULL,i);
699 iot.nwait--; /* Shrink array of pollfds */
700 set_pollfd(&iot.fd, get_pollfd(&iot.fd,iot.nwait),i);
701 set_task_env_p(&iot.tasks, get_task_env_p(&iot.tasks,iot.nwait),i);
702 }
703
poll_wait(int ms)704 static int poll_wait(int ms) {
705 result nfds = {0, 0};
706 int wake = 0;
707
708 /* Wait at most ms milliseconds */
709 MAY_DBG(FN; NDBG(ms, d));
710 if (ms < 0 || ms > 1000) ms = 1000; /* Wait at most 1000 ms */
711 SET_OS_ERR(0);
712 while ((nfds.val = poll(iot.fd.pollfd_array_val, iot.nwait, ms)) == -1) {
713 nfds.funerr = to_errno(GET_OS_ERR);
714 if (nfds.funerr != SOCK_EINTR) {
715 task_dump_err(nfds.funerr);
716 MAY_DBG(FN; STRLIT("poll failed"));
717 abort();
718 }
719 SET_OS_ERR(0);
720 }
721 /* Wake up ready tasks */
722 {
723 int i = 0;
724 int interrupt = 0;
725 while (i < iot.nwait) {
726 interrupt =
727 (get_task_env_p(&iot.tasks,i)->time != 0.0 &&
728 get_task_env_p(&iot.tasks,i)->time < task_now());
729 if (interrupt || /* timeout ? */
730 get_pollfd(&iot.fd,i).revents) {
731 /* if(iot.fd[i].revents & POLLERR) abort(); */
732 get_task_env_p(&iot.tasks,i)->interrupt = interrupt;
733 poll_wakeup(i);
734 wake = 1;
735 } else {
736 i++;
737 }
738 }
739 }
740 return wake;
741 }
742
add_fd(task_env * t,int fd,int op)743 static void add_fd(task_env *t, int fd, int op) {
744 int events = 'r' == op ? POLLIN | POLLRDNORM : POLLOUT;
745 MAY_DBG(FN; PTREXP(t); NDBG(fd, d); NDBG(op, d));
746 assert(fd >= 0);
747 t->waitfd = fd;
748 deactivate(t);
749 task_ref(t);
750 set_task_env_p(&iot.tasks, t, iot.nwait);
751 {
752 pollfd x;
753 x.fd = fd;
754 x.events = events;
755 x.revents = 0;
756 set_pollfd(&iot.fd, x, iot.nwait);
757 }
758 iot.nwait++;
759 }
760
unpoll(int i)761 void unpoll(int i) {
762 task_unref(get_task_env_p(&iot.tasks, i));
763 set_task_env_p(&iot.tasks, NULL,i);
764 {
765 pollfd x;
766 x.fd = -1;
767 x.events = 0;
768 x.revents = 0;
769 set_pollfd(&iot.fd, x, i);
770 }
771 }
772
wake_all_io()773 static void wake_all_io() {
774 int i;
775 for (i = 0; i < iot.nwait; i++) {
776 activate(get_task_env_p(&iot.tasks,i));
777 unpoll(i);
778 }
779 iot.nwait = 0;
780 }
781
remove_and_wakeup(int fd)782 void remove_and_wakeup(int fd) {
783 int i = 0;
784 MAY_DBG(FN; NDBG(fd, d));
785 while (i < iot.nwait) {
786 if (get_pollfd(&iot.fd,i).fd == fd) {
787 poll_wakeup(i);
788 } else {
789 i++;
790 }
791 }
792 }
793
794 task_env *stack = NULL;
795
wait_io(task_env * t,int fd,int op)796 task_env *wait_io(task_env *t, int fd, int op)
797 {
798 t->time = 0.0;
799 t->interrupt = 0;
800 add_fd(deactivate(t), fd, op);
801 return t;
802 }
803
804
timed_wait_io(task_env * t,int fd,int op,double timeout)805 static task_env *timed_wait_io(task_env *t, int fd, int op, double timeout)
806 {
807 t->time = task_now() + timeout;
808 t->interrupt = 0;
809 add_fd(deactivate(t), fd, op);
810 return t;
811 }
812
813
814 static uint64_t send_count;
815 static uint64_t receive_count;
816 static uint64_t send_bytes;
817 static uint64_t receive_bytes;
818
819 #ifdef XCOM_HAVE_OPENSSL
con_read(connection_descriptor const * rfd,void * buf,int n)820 result con_read(connection_descriptor const *rfd, void *buf, int n)
821 {
822 result ret = {0,0};
823
824 if (rfd->ssl_fd) {
825 ERR_clear_error();
826 ret.val = SSL_read(rfd->ssl_fd, buf, n);
827 ret.funerr = to_ssl_err(SSL_get_error(rfd->ssl_fd, ret.val));
828 } else {
829 SET_OS_ERR(0);
830 ret.val = (int)recv(rfd->fd, buf, (size_t)n, 0);
831 ret.funerr = to_errno(GET_OS_ERR);
832 }
833 return ret;
834 }
835 #else
con_read(connection_descriptor const * rfd,void * buf,int n)836 result con_read(connection_descriptor const *rfd, void *buf, int n)
837 {
838 result ret = {0,0};
839
840 SET_OS_ERR(0);
841 ret.val = recv(rfd->fd, buf, (size_t)n, 0);
842 ret.funerr = to_errno(GET_OS_ERR);
843
844 return ret;
845 }
846 #endif
847
848 /*
849 It just reads no more than INT_MAX bytes. Caller should call it again for
850 read more than INT_MAX bytes.
851
852 Either the bytes written or an error number is returned to the caller through
853 'ret' argument. Error number is always negative integers.
854 */
task_read(connection_descriptor const * con,void * buf,int n,int64_t * ret)855 int task_read(connection_descriptor const* con, void *buf, int n, int64_t *ret)
856 {
857 DECL_ENV
858 int dummy;
859 END_ENV;
860
861 result sock_ret = {0,0};
862 *ret= 0;
863
864 assert(n >= 0);
865
866 TASK_BEGIN
867 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
868
869 for(;;){
870 if(con->fd <= 0)
871 TASK_FAIL;
872 sock_ret = con_read(con, buf, n);
873 *ret = sock_ret.val;
874 task_dump_err(sock_ret.funerr);
875 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
876 if(sock_ret.val >= 0 || (! can_retry_read(sock_ret.funerr)))
877 break;
878 wait_io(stack, con->fd, 'r');
879 TASK_YIELD;
880 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
881 }
882
883 assert(!can_retry_read(sock_ret.funerr));
884 FINALLY
885 receive_count++;
886 if(*ret > 0)
887 receive_bytes += (uint64_t)(*ret);
888 TASK_END;
889 }
890
891 #ifdef XCOM_HAVE_OPENSSL
con_write(connection_descriptor const * wfd,void * buf,int n)892 result con_write(connection_descriptor const *wfd, void *buf, int n)
893 {
894 result ret = {0,0};
895
896 assert(n >0);
897
898 if (wfd->ssl_fd) {
899 ERR_clear_error();
900 ret.val = SSL_write(wfd->ssl_fd, buf, n);
901 ret.funerr = to_ssl_err(SSL_get_error(wfd->ssl_fd, ret.val));
902 } else {
903 SET_OS_ERR(0);
904 ret.val = (int)send(wfd->fd, buf, (size_t)n, 0);
905 ret.funerr = to_errno(GET_OS_ERR);
906 }
907 return ret;
908 }
909 #else
con_write(connection_descriptor const * wfd,void * buf,int n)910 result con_write(connection_descriptor const *wfd, void *buf, int n)
911 {
912 result ret = {0,0};
913
914 assert(n >0);
915
916 SET_OS_ERR(0);
917 ret.val = send(wfd->fd, buf, (size_t)n, 0);
918 ret.funerr = to_errno(GET_OS_ERR);
919 return ret;
920 }
921 #endif
922
923 /*
924 It writes no more than UINT_MAX bytes which is the biggest size of
925 paxos message.
926
927 Either the bytes written or an error number is returned to the caller through
928 'ret' argument. Error number is always negative integers.
929 */
task_write(connection_descriptor const * con,void * _buf,uint32_t n,int64_t * ret)930 int task_write(connection_descriptor const *con, void *_buf, uint32_t n,
931 int64_t *ret)
932 {
933 char *buf = (char *) _buf;
934 DECL_ENV
935 uint32_t total; /* Keeps track of number of bytes written so far */
936 END_ENV;
937 result sock_ret = {0,0};
938
939 TASK_BEGIN
940 ep->total = 0;
941 *ret= 0;
942 while (ep->total < n) {
943 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(n - ep->total,u));
944 for(;;){
945 if(con->fd <= 0)
946 TASK_FAIL;
947 /*
948 con_write can only write messages that their sizes don't exceed
949 INT_MAX bytes. We should never pass a length bigger than INT_MAX
950 to con_write.
951 */
952 sock_ret = con_write(con, buf + ep->total, n-ep->total >= INT_MAX ?
953 INT_MAX : (int)(n-ep->total));
954 task_dump_err(sock_ret.funerr);
955 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(sock_ret.val, d));
956 if(sock_ret.val >= 0 || (! can_retry_write(sock_ret.funerr)))
957 break;
958 wait_io(stack, con->fd, 'w');
959 MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(n - ep->total,u));
960 TASK_YIELD;
961 }
962 if (0 == sock_ret.val) { /* We have successfully written n bytes */
963 TERMINATE;
964 } else if (sock_ret.val < 0) { /* Something went wrong */
965 TASK_FAIL;
966 } else {
967 /* Add number of bytes written to total */
968 ep->total += (uint32_t)sock_ret.val;
969 }
970 }
971 assert(ep->total == n);
972 TASK_RETURN(ep->total);
973 FINALLY
974 send_count++;
975 send_bytes += ep->total;
976 TASK_END;
977 }
978
979
unblock_fd(int fd)980 int unblock_fd(int fd)
981 {
982 #if !defined(WIN32) && !defined(WIN64)
983 int x = fcntl(fd, F_GETFL, 0);
984 x = fcntl(fd, F_SETFL, x | O_NONBLOCK);
985 #else
986 /**
987 * On windows we toggle the FIONBIO flag directly
988 *
989 * Undocumented in MSDN:
990 * Calling ioctlsocket(FIONBIO) to an already set state
991 * seems to return -1 and WSAGetLastError() == 0.
992 */
993 u_long nonblocking = 1; /** !0 == non-blocking */
994 int x = ioctlsocket(fd, FIONBIO, &nonblocking);
995 #endif
996 return x;
997 }
998
999
block_fd(int fd)1000 int block_fd(int fd)
1001 {
1002 #if !defined(WIN32) && !defined(WIN64)
1003 int x = fcntl(fd, F_GETFL, 0);
1004 x = fcntl(fd, F_SETFL, x & ~O_NONBLOCK);
1005 #else
1006 /**
1007 * On windows we toggle the FIONBIO flag directly.
1008 *
1009 * Undocumented in MSDN:
1010 * Calling ioctlsocket(FIONBIO) to an already set state seems to
1011 * return -1 and WSAGetLastError() == 0.
1012 */
1013 u_long nonblocking = 0; /** 0 == blocking */
1014 int x = ioctlsocket(fd, FIONBIO, &nonblocking);
1015 #endif
1016 return x;
1017 }
1018
1019 /* purecov: begin deadcode */
is_only_task()1020 int is_only_task()
1021 {
1022 return link_first(&tasks) == link_last(&tasks);
1023 }
1024 /* purecov: end */
1025
first_runnable()1026 static task_env *first_runnable()
1027 {
1028 return (task_env * )link_first(&tasks);
1029 }
1030
1031
next_task(task_env * t)1032 static task_env *next_task(task_env *t)
1033 {
1034 return (task_env * )link_first(&t->l);
1035 }
1036
1037
is_task_head(task_env * t)1038 static int is_task_head(task_env *t)
1039 {
1040 return & t->l == &tasks;
1041 }
1042
1043
msdiff(double time)1044 static int msdiff(double time)
1045 {
1046 return (int)(1000.5 * (first_delayed()->time - time));
1047 }
1048
1049
1050 static double idle_time = 0.0;
task_loop()1051 void task_loop()
1052 {
1053 /* While there are tasks */
1054 for(;;) {
1055 task_env * t = first_runnable();
1056 /* While runnable tasks */
1057 while (runnable_tasks()) {
1058 task_env * next = next_task(t);
1059 if (!is_task_head(t)) {
1060 /* DBGOUT(FN; PTREXP(t); STRLIT(t->name ? t->name : "TASK WITH NO NAME")); */
1061 stack = t;
1062 assert(stack);
1063 assert(t->terminate != TERMINATED);
1064 if (stack->debug)
1065 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1066 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1067 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
1068 {
1069 /* double when = seconds(); */
1070 int val = 0;
1071 assert(t->func);
1072 assert(stack == t);
1073 val = t->func(t->arg);
1074 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1075 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1076 assert(ash_nazg_gimbatul.type == type_hash("task_env"));
1077 if (!val) { /* Is task finished? */
1078 deactivate(t);
1079 t->terminate = TERMINATED;
1080 task_unref(t);
1081 stack = NULL;
1082 }
1083 }
1084 }
1085 t = next;
1086 }
1087 if (active_tasks <= 0)
1088 break;
1089 /* When we get here, there are no runnable tasks left.
1090 Wait until something happens.
1091 */
1092 {
1093 double time = seconds();
1094 if (delayed_tasks()) {
1095 int ms = msdiff(time);
1096 if (ms > 0) {
1097 if (the_app_xcom_cfg != NULL && the_app_xcom_cfg->m_poll_spin_loops)
1098 {
1099 u_int busyloop;
1100 for(busyloop = 0; busyloop < the_app_xcom_cfg->m_poll_spin_loops; busyloop++){
1101 ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms)", 0);
1102 if(poll_wait(0)) /*Just poll */
1103 goto done_wait;
1104 ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms) end", 0);
1105 thread_yield();
1106 }
1107 }
1108 ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms)", ms);
1109 poll_wait(ms); /* Wait at most ms milliseconds and poll for IO */
1110 ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms) end", ms);
1111 }
1112 done_wait:
1113 /* While tasks with expired timers */
1114 while (delayed_tasks() &&
1115 msdiff(time) <= 0) {
1116 task_env * t = extract_first_delayed(); /* May be NULL */
1117 if (t)
1118 activate(t); /* Make it runnable */
1119 }
1120 } else {
1121 ADD_T_EV(task_now(), __FILE__, __LINE__, "poll_wait(-1)");
1122 poll_wait(-1); /* Wait and poll for IO */
1123 ADD_T_EV(seconds(), __FILE__, __LINE__, "poll_wait(-1) end");
1124 /* } */
1125 }
1126 idle_time += seconds() - time;
1127 }
1128 }
1129 task_sys_deinit();
1130 }
1131
1132
init_sockaddr(char * server,struct sockaddr_in * sock_addr,socklen_t * sock_size,xcom_port port)1133 static int init_sockaddr(char *server, struct sockaddr_in *sock_addr,
1134 socklen_t *sock_size, xcom_port port)
1135 {
1136 /* Get address of server */
1137 struct addrinfo *addr = 0;
1138
1139 checked_getaddrinfo(server, 0, 0, &addr);
1140
1141 if (!addr)
1142 return 0;
1143
1144 /* Copy first address */
1145 memcpy(sock_addr, addr->ai_addr, addr->ai_addrlen);
1146 *sock_size = addr->ai_addrlen;
1147 sock_addr->sin_port = htons(port);
1148
1149 /* Clean up allocated memory by getaddrinfo */
1150 freeaddrinfo(addr);
1151
1152 return 1;
1153 }
1154
1155
1156 #if TASK_DBUG_ON
print_sockaddr(struct sockaddr * a)1157 static void print_sockaddr(struct sockaddr *a)
1158 {
1159 u_int i;
1160 GET_GOUT;
1161 NDBG(a->sa_family,u); NDBG(a->sa_family,d);
1162 STRLIT(" data ");
1163 for (i = 0; i < sizeof(a->sa_data); i++) {
1164 NPUT((unsigned char)a->sa_data[i],d);
1165 }
1166 PRINT_GOUT;
1167 FREE_GOUT;
1168 }
1169 #endif
1170
1171
connect_tcp(char * server,xcom_port port,int * ret)1172 int connect_tcp(char *server, xcom_port port, int *ret)
1173 {
1174 DECL_ENV
1175 int fd;
1176 struct sockaddr sock_addr;
1177 socklen_t sock_size;
1178 END_ENV;
1179 TASK_BEGIN;
1180 DBGOUT(FN; STREXP(server); NDBG(port,d));
1181 /* Create socket */
1182 if ((ep->fd = xcom_checked_socket(AF_INET, SOCK_STREAM, 0).val) < 0) {
1183 DBGOUT(FN; NDBG(ep->fd, d));
1184 TASK_FAIL;
1185 }
1186 /* Make it non-blocking */
1187 unblock_fd(ep->fd);
1188 /* Get address of server */
1189 /* OHKFIX Move this before call to xcom_checked_socket and use addrinfo fields as params to socket call */
1190 if (!init_sockaddr(server, (struct sockaddr_in *)&ep->sock_addr,
1191 &ep->sock_size, port)) {
1192 DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1193 TASK_FAIL;
1194 }
1195 #if TASK_DBUG_ON
1196 DBGOUT(FN; print_sockaddr(&ep->sock_addr));
1197 #endif
1198 /* Connect socket to address */
1199 {
1200 result sock = {0,0};
1201 SET_OS_ERR(0);
1202 sock.val = connect(ep->fd, &ep->sock_addr, ep->sock_size);
1203 sock.funerr = to_errno(GET_OS_ERR);
1204 if (sock.val < 0) {
1205 if (hard_connect_err(sock.funerr)) {
1206 task_dump_err(sock.funerr);
1207 MAY_DBG(FN;
1208 NDBG(ep->fd, d);
1209 NDBG(ep->sock_size,d));
1210 #if TASK_DBUG_ON
1211 DBGOUT(FN; print_sockaddr(&ep->sock_addr));
1212 #endif
1213 DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1214 close_socket(&ep->fd);
1215 TASK_FAIL;
1216 }
1217 }
1218 /* Wait until connect has finished */
1219 retry:
1220 timed_wait_io(stack, ep->fd, 'w', 10.0);
1221 TASK_YIELD;
1222 /* See if we timed out here. If we did, connect may or may not be active.
1223 If closing fails with EINPROGRESS, we need to retry the select.
1224 If close does not fail, we know that connect has indeed failed, and we
1225 exit from here and return -1 as socket fd */
1226 if(stack->interrupt){
1227 result shut = {0,0};
1228 stack->interrupt = 0;
1229
1230 /* Try to close socket on timeout */
1231 shut = shut_close_socket(&ep->fd);
1232 DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1233 task_dump_err(shut.funerr);
1234 if (from_errno(shut.funerr) == SOCK_EINPROGRESS)
1235 goto retry; /* Connect is still active */
1236 TASK_FAIL; /* Connect has failed */
1237 }
1238
1239
1240 {
1241 int peer = 0;
1242 /* Sanity check before return */
1243 SET_OS_ERR(0);
1244 sock.val = peer = getpeername(ep->fd, &ep->sock_addr, &ep->sock_size);
1245 sock.funerr = to_errno(GET_OS_ERR);
1246 if (peer >= 0) {
1247 TASK_RETURN(ep->fd);
1248 } else {
1249 /* Something is wrong */
1250 socklen_t errlen = sizeof(sock.funerr);
1251
1252 getsockopt(ep->fd, SOL_SOCKET, SO_ERROR, (void * ) & sock.funerr, &errlen);
1253 if (sock.funerr == 0) {
1254 sock.funerr = to_errno(SOCK_ECONNREFUSED);
1255 }
1256
1257 shut_close_socket(&ep->fd);
1258 if (sock.funerr == 0)
1259 sock.funerr = to_errno(SOCK_ECONNREFUSED);
1260 TASK_FAIL;
1261 }
1262 }
1263 }
1264 FINALLY
1265 TASK_END;
1266 }
1267
set_nodelay(int fd)1268 result set_nodelay(int fd)
1269 {
1270 int n = 1;
1271 result ret = {0,0};
1272
1273 do{
1274 SET_OS_ERR(0);
1275 ret.val = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) & n, sizeof n);
1276 ret.funerr = to_errno(GET_OS_ERR);
1277 DBGOUT(FN; NDBG(from_errno(ret.funerr),d));
1278 }while(ret.val < 0 && can_retry(ret.funerr));
1279 return ret;
1280 }
1281
create_server_socket()1282 static result create_server_socket()
1283 {
1284 result fd ={0,0};
1285 /* Create socket */
1286 if ((fd = xcom_checked_socket(PF_INET, SOCK_STREAM, 0)).val < 0) {
1287 G_MESSAGE("Unable to create socket "
1288 "(socket=%d, errno=%d)!",
1289 fd.val, to_errno(GET_OS_ERR));
1290 return fd;
1291 }
1292 {
1293 int reuse = 1;
1294 SET_OS_ERR(0);
1295 if (setsockopt(fd.val, SOL_SOCKET, SOCK_OPT_REUSEADDR, (void * ) & reuse, sizeof(reuse)) < 0) {
1296 fd.funerr = to_errno(GET_OS_ERR);
1297 G_MESSAGE("Unable to set socket options "
1298 "(socket=%d, errno=%d)!",
1299 fd.val, to_errno(GET_OS_ERR));
1300 close_socket(&fd.val);
1301 return fd;
1302 }
1303 }
1304 return fd;
1305 }
1306
1307
init_server_addr(struct sockaddr_in * sock_addr,xcom_port port)1308 static void init_server_addr(struct sockaddr_in *sock_addr, xcom_port port)
1309 {
1310 memset(sock_addr, 0, sizeof(*sock_addr));
1311 sock_addr->sin_family = PF_INET;
1312 sock_addr->sin_port = htons(port);
1313 }
1314
1315
announce_tcp(xcom_port port)1316 result announce_tcp(xcom_port port)
1317 {
1318 result fd;
1319 struct sockaddr_in sock_addr;
1320
1321 fd = create_server_socket();
1322 if (fd.val < 0) {
1323 return fd;
1324 }
1325 init_server_addr(&sock_addr, port);
1326 if (bind(fd.val, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
1327 int err = to_errno(GET_OS_ERR);
1328 G_MESSAGE("Unable to bind to %s:%d (socket=%d, errno=%d)!",
1329 "0.0.0.0", port, fd.val, err);
1330 goto err;
1331 }
1332 G_DEBUG("Successfully bound to %s:%d (socket=%d).",
1333 "0.0.0.0", port, fd.val);
1334 if (listen(fd.val, 32) < 0) {
1335 int err = to_errno(GET_OS_ERR);
1336 G_MESSAGE("Unable to listen backlog to 32. "
1337 "(socket=%d, errno=%d)!", fd.val, err);
1338 goto err;
1339 }
1340 G_DEBUG("Successfully set listen backlog to 32 "
1341 "(socket=%d)!", fd.val);
1342 /* Make socket non-blocking */
1343 unblock_fd(fd.val);
1344 if (fd.val < 0)
1345 {
1346 int err = to_errno(GET_OS_ERR);
1347 G_MESSAGE("Unable to unblock socket (socket=%d, errno=%d)!",
1348 fd.val, err);
1349 }
1350 else
1351 {
1352 G_DEBUG("Successfully unblocked socket (socket=%d)!", fd.val);
1353 }
1354 return fd;
1355
1356 err:
1357 fd.funerr = to_errno(GET_OS_ERR);
1358 task_dump_err(fd.funerr);
1359 close_socket(&fd.val);
1360 return fd;
1361 }
1362
1363
accept_tcp(int fd,int * ret)1364 int accept_tcp(int fd, int *ret)
1365 {
1366 struct sockaddr sock_addr;
1367 DECL_ENV
1368 int connection;
1369 END_ENV;
1370 TASK_BEGIN;
1371 /* Wait for connection attempt */
1372 wait_io(stack, fd, 'r');
1373 TASK_YIELD;
1374 /* Spin on benign error code */
1375 {
1376 socklen_t size = sizeof sock_addr;
1377 result res = {0,0};
1378 do{
1379 SET_OS_ERR(0);
1380 res.val = ep->connection = accept(fd, (void * ) & sock_addr, &size);
1381 res.funerr = to_errno(GET_OS_ERR);
1382 }while(res.val < 0 && from_errno(res.funerr) == SOCK_EINTR);
1383
1384 if (ep->connection < 0) {
1385 TASK_FAIL;
1386 }
1387 }
1388 #if TASK_DBUG_ON
1389 DBGOUT(FN; print_sockaddr(&sock_addr));
1390 #endif
1391 TASK_RETURN(ep->connection);
1392 FINALLY
1393 TASK_END;
1394 }
1395
1396
1397 #define STAT_INTERVAL 1.0
1398
1399 #if 0
1400
1401 /*
1402 This was disabled to prevent unecessary build warnings.
1403
1404 TODO:
1405 Needs to be assessed whether it should be removed altogether.
1406 */
1407
1408 static int statistics_task(task_arg arg)
1409 {
1410 DECL_ENV
1411 double next;
1412 END_ENV;
1413 TASK_BEGIN
1414 idle_time = 0.0;
1415 send_count = 0;
1416 receive_count = 0;
1417 send_bytes = 0;
1418 receive_bytes = 0;
1419 ep->next = seconds() + STAT_INTERVAL;
1420 TASK_DELAY_UNTIL(ep->next);
1421 for(;;) {
1422 G_DEBUG("task system idle %f send/s %f receive/s %f send b/s %f receive b/s %f",
1423 (idle_time / STAT_INTERVAL) * 100.0, send_count / STAT_INTERVAL, receive_count / STAT_INTERVAL,
1424 send_bytes / STAT_INTERVAL, receive_bytes / STAT_INTERVAL);
1425 idle_time = 0.0;
1426 send_count = 0;
1427 receive_count = 0;
1428 send_bytes = 0;
1429 receive_bytes = 0;
1430 ep->next += STAT_INTERVAL;
1431 TASK_DELAY_UNTIL(ep->next);
1432 }
1433 FINALLY
1434 TASK_END;
1435 }
1436 #endif
1437
init_task_vars()1438 static void init_task_vars()
1439 {
1440 stack = 0;
1441 task_errno = 0;
1442 }
1443
task_sys_init()1444 void task_sys_init()
1445 {
1446 DBGOUT(FN; NDBG(FD_SETSIZE,d));
1447 init_task_vars();
1448 link_init(&tasks, type_hash("task_env"));
1449 link_init(&free_tasks, type_hash("task_env"));
1450 link_init(&ash_nazg_gimbatul, type_hash("task_env"));
1451 /* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1452 /* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1453 iotasks_init(&iot);
1454 seconds(); /* Needed to initialize _now */
1455 /* task_new(statistics_task, null_arg, "statistics_task", 1); */
1456 }
1457
1458
task_sys_deinit()1459 static void task_sys_deinit()
1460 {
1461 DBGOUT(FN);
1462 iotasks_deinit(&iot);
1463 }
1464
1465 /* purecov: begin deadcode */
is_running(task_env * t)1466 int is_running(task_env *t)
1467 {
1468 return t && t->terminate == RUN;
1469 }
1470 /* purecov: end */
1471
set_task(task_env ** p,task_env * t)1472 void set_task(task_env**p, task_env *t)
1473 {
1474 if (t)
1475 task_ref(t);
1476 if (*p)
1477 task_unref(*p);
1478 *p = t;
1479 }
1480
1481
task_name()1482 const char *task_name()
1483 {
1484 return stack ? stack->name : "idle";
1485 }
1486
1487
1488 task_event task_events[MAX_TASK_EVENT];
1489 int cur_task_event;
1490 int max_task_event;
1491
1492 #ifdef WIN
1493 #define snprintf(...) _snprintf(__VA_ARGS__)
1494 #endif
1495
1496 /* purecov: begin deadcode */
ev_print(task_event te)1497 void ev_print(task_event te)
1498 {
1499 enum {
1500 bufsize = 10000
1501 };
1502 static char buf[bufsize];
1503 static size_t pos = 0;
1504
1505 if (te.pad) {
1506 switch (te.arg.type) {
1507 case a_int:
1508 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%d ", te.arg.val.i);
1509 break;
1510 case a_long:
1511 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%ld ", te.arg.val.l);
1512 break;
1513 case a_uint:
1514 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%u ", te.arg.val.u_i);
1515 break;
1516 case a_ulong:
1517 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%lu ", te.arg.val.u_l);
1518 break;
1519 case a_ulong_long:
1520 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%llu ", te.arg.val.u_ll);
1521 break;
1522 case a_float:
1523 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f ", te.arg.val.f);
1524 break;
1525 case a_double:
1526 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f ", te.arg.val.d);
1527 break;
1528 case a_void:
1529 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%p ", te.arg.val.v);
1530 break;
1531 case a_string:
1532 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%s ", te.arg.val.s);
1533 break;
1534 case a_end:
1535 xcom_log(LOG_TRACE, buf);
1536 pos = 0;
1537 break;
1538 default:
1539 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "??? ");
1540 }
1541 } else {
1542 switch (te.arg.type) {
1543 case a_int:
1544 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%d", te.arg.val.i);
1545 break;
1546 case a_long:
1547 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%ld", te.arg.val.l);
1548 break;
1549 case a_uint:
1550 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%u", te.arg.val.u_i);
1551 break;
1552 case a_ulong:
1553 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%lu", te.arg.val.u_l);
1554 break;
1555 case a_ulong_long:
1556 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%llu", te.arg.val.u_ll);
1557 break;
1558 case a_float:
1559 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f", te.arg.val.f);
1560 break;
1561 case a_double:
1562 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f", te.arg.val.d);
1563 break;
1564 case a_void:
1565 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%p", te.arg.val.v);
1566 break;
1567 case a_string:
1568 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%s", te.arg.val.s);
1569 break;
1570 case a_end:
1571 xcom_log(LOG_TRACE, buf);
1572 pos = 0;
1573 break;
1574 default:
1575 pos += (size_t)snprintf(&buf[pos], bufsize - pos, "???");
1576 }
1577 }
1578 buf[pos] = 0;
1579 }
1580
1581
add_event(task_arg te)1582 void add_event(task_arg te)
1583 {
1584 task_events[cur_task_event].arg = te;
1585 task_events[cur_task_event].pad = 1;
1586
1587 cur_task_event++;
1588 if (cur_task_event > max_task_event)
1589 max_task_event = cur_task_event;
1590 cur_task_event %= MAX_TASK_EVENT;
1591 }
1592
1593
add_unpad_event(task_arg te)1594 void add_unpad_event(task_arg te)
1595 {
1596 task_events[cur_task_event].arg = te;
1597 task_events[cur_task_event].pad = 0;
1598
1599 cur_task_event++;
1600 if (cur_task_event > max_task_event)
1601 max_task_event = cur_task_event;
1602 cur_task_event %= MAX_TASK_EVENT;
1603 }
1604
1605
add_base_event(double when,char const * file,int state)1606 void add_base_event(double when, char const *file, int state)
1607 {
1608 static double t = 0.0;
1609
1610 add_event(double_arg(when));
1611 add_event(double_arg(when - t));
1612 t = when;
1613 add_unpad_event(string_arg(file));
1614 add_unpad_event(string_arg(":"));
1615 add_event(int_arg(state));
1616 }
1617
1618
add_task_event(double when,char const * file,int state,char const * what)1619 void add_task_event(double when, char const *file, int state, char const *what)
1620 {
1621 add_base_event(when, file, state);
1622 add_event(string_arg(what));
1623 add_event(end_arg());
1624 }
1625
1626
add_wait_event(double when,char * file,int state,char * what,int milli)1627 void add_wait_event(double when, char *file, int state, char *what, int milli)
1628 {
1629 add_base_event(when, file, state);
1630 add_event(string_arg(what));
1631
1632 add_event(string_arg("milli"));
1633 add_event(int_arg(milli));
1634 add_event(end_arg());
1635 }
1636
1637
dump_range(double t MY_ATTRIBUTE ((unused)),int start,int end)1638 static void dump_range(double t MY_ATTRIBUTE((unused)
1639 ), int start, int end)
1640 {
1641 int i;
1642 for (i = start; i < end; i++) {
1643 ev_print(task_events[i]);
1644 }
1645 }
1646
1647
dump_task_events()1648 void dump_task_events()
1649 {
1650 double t = 0.0;
1651 G_DEBUG("cur_task_event %d max_task_event %d", cur_task_event, max_task_event);
1652 add_event(end_arg());
1653 dump_range(t, cur_task_event, max_task_event);
1654 dump_range(t, 0, cur_task_event);
1655 }
1656 /* purecov: end */
1657