1 /* Copyright (c) 2012, 2018, Oracle and/or its affiliates. All rights reserved.
2 
3    This program is free software; you can redistribute it and/or modify
4    it under the terms of the GNU General Public License, version 2.0,
5    as published by the Free Software Foundation.
6 
7    This program is also distributed with certain software (including
8    but not limited to OpenSSL) that is licensed under separate terms,
9    as designated in a particular file or component or in included license
10    documentation.  The authors of MySQL hereby grant you an additional
11    permission to link the program and your derivative works with the
12    separately licensed software that they have included with MySQL.
13 
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License, version 2.0, for more details.
18 
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, write to the Free Software
21    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301  USA */
22 
23 /**
24   @file task.c
25   Rudimentary, non-preemptive task system in portable C,
26   based on Tom Duff's switch-based coroutine trick
27   and a stack of environment structs. (continuations?)
28   Nonblocking IO and event handling need to be rewritten for each new OS.
29   The code is not MT-safe, but could be made safe by moving all global
30   variables into a context struct which could be the first parameter
31   to all the functions.
32 */
33 #if defined(linux) && !defined(_XOPEN_SOURCE)
34 #define _XOPEN_SOURCE
35 #endif
36 
37 #if defined(linux) && !defined(_GNU_SOURCE)
38 #define _GNU_SOURCE
39 #endif
40 
41 #include "x_platform.h"
42 
43 #ifdef XCOM_HAVE_OPENSSL
44 #include "openssl/ssl.h"
45 #include "openssl/err.h"
46 #endif
47 
48 #include <limits.h>
49 #include <stdlib.h>
50 #include "xcom_vp.h"
51 #include "node_connection.h"
52 
53 #ifndef WIN
54 #include <arpa/inet.h>
55 #endif
56 
57 #include <sys/types.h>
58 #include <sys/time.h>
59 #include <stdio.h>
60 #include <assert.h>
61 #include <errno.h>
62 #include <string.h>
63 #include <fcntl.h>
64 
65 #include "task_debug.h"
66 #include "task_net.h"
67 #include "simset.h"
68 #include "task_os.h"
69 #include "task.h"
70 #include "xcom_cfg.h"
71 #ifndef _WIN32
72 #include <sys/poll.h>
73 #endif
74 
75 #include "retry.h"
76 #include "xdr_utils.h"
77 
78 extern char *pax_op_to_str(int x);
79 
80 task_arg null_arg = {a_end,{0}};
81 
82 struct iotasks ;
83 typedef struct iotasks iotasks;
84 
85 typedef struct {
86   u_int pollfd_array_len;
87   pollfd *pollfd_array_val;
88 } pollfd_array;
89 
90 typedef task_env* task_env_p;
91 
92 typedef struct {
93   u_int task_env_p_array_len;
94   task_env_p *task_env_p_array_val;
95 } task_env_p_array;
96 
97 init_xdr_array(pollfd)
98 free_xdr_array(pollfd)
99 set_xdr_array(pollfd)
100 get_xdr_array(pollfd)
101 init_xdr_array(task_env_p)
102 free_xdr_array(task_env_p)
103 set_xdr_array(task_env_p)
104 get_xdr_array(task_env_p)
105 
106 struct iotasks {
107   int nwait;
108   pollfd_array fd;
109   task_env_p_array tasks;
110 };
111 
112 int	task_errno = 0;
113 static task_env *extract_first_delayed();
114 static task_env *task_ref(task_env *t);
115 static task_env *task_unref(task_env *t);
116 static void wake_all_io();
117 static void task_sys_deinit();
118 
119 /* Return time as seconds */
120 static double	_now = 0.0;
121 
task_now()122 double	task_now()
123 {
124 	return _now;
125 }
126 
127 
128 #ifdef WIN
gettimeofday(struct timeval * tp,struct timezone * tzp)129 int gettimeofday(struct timeval * tp, struct timezone * tzp)
130 {
131     static uint64_t const EPOCH = ((uint64_t) 116444736000000000ULL);
132 
133     SYSTEMTIME  system_time;
134     FILETIME    file_time;
135     uint64_t    time;
136 
137     GetSystemTime( &system_time );
138     SystemTimeToFileTime( &system_time, &file_time );
139     time =  ((uint64_t)file_time.dwLowDateTime )      ;
140     time += ((uint64_t)file_time.dwHighDateTime) << 32;
141 
142     tp->tv_sec  = (long) ((time - EPOCH) / 10000000L);
143     tp->tv_usec = (long) (system_time.wMilliseconds * 1000);
144     return 0;
145 }
146 #endif
147 
seconds(void)148 double	seconds(void)
149 {
150 	struct timeval tv;
151 	if (gettimeofday(&tv, 0) < 0)
152 		return - 1.0;
153 	return _now = (double)tv.tv_sec + (double)tv.tv_usec / 1000000.0;
154 }
155 
156 
157 #ifdef NOTDEF
task_queue_init(task_queue * q)158 static void task_queue_init(task_queue *q)
159 {
160        q->curn = 0;
161 }
162 
task_queue_debug(task_queue * q)163 static void task_queue_debug(task_queue *q)
164 {
165 	int	i;
166 	GET_GOUT;
167 	STRLIT("task_queue_debug ");
168 	for (i = 1; i <= q->curn; i++) {
169 		NDBG(i, d);
170 		PTREXP(q->x[i]);
171 		STREXP(q->x[i]->name);
172 		NDBG(q->x[i]->heap_pos, d);
173 		NDBG(q->x[i]->terminate, d);
174 		NDBG(q->x[i]->time, f);
175 	}
176   PRINT_GOUT;
177   FREE_GOUT;
178 }
179 
180 
is_heap(task_queue * q)181 static int is_heap(task_queue *q)
182 {
183 	if (q->curn) {
184 		int	i;
185 		for (i = q->curn; i > 1; i--) {
186 			if ((q->x[i]->time < q->x[i/2]->time) ||
187 			    (q->x[i]->heap_pos != i)) {
188 				task_queue_debug(q);
189 				return 0;
190 			}
191 		}
192 		if (q->x[1]->heap_pos != 1) {
193 			task_queue_debug(q);
194 			return 0;
195 		}
196 	}
197 	return 1;
198 }
199 
task_queue_full(task_queue * q)200 static int     task_queue_full(task_queue *q)
201 {
202        /* assert(is_heap(q)); */
203        return q->curn >= MAXTASKS;
204 }
205 
206 
207 #endif
208 
209 #define FIX_POS(i) q->x[i]->heap_pos = (i)
210 /* #define TASK_SWAP(x,y) { task_env *tmp = (x); (x) = (y); (y) = (tmp); } */
211 #define TASK_SWAP(i,j) { task_env *tmp = q->x[i]; q->x[i] = q->x[j]; q->x[j] = tmp; FIX_POS(i); FIX_POS(j); }
212 #define TASK_MOVE(i,j) { q->x[i] = q->x[j]; FIX_POS(i); }
213 /* Put the task_env* at index n in its right place when Heap(1,n-1) */
task_queue_siftup(task_queue * q,int n)214 static void task_queue_siftup(task_queue *q, int n)
215 {
216 	int	i = n;
217 	int	p;
218 	assert(n >= 0);
219 	/* Heap(1,n-1) */
220 	for (; ; ) {
221 		if (i == 1)
222 			break; /* Reached root */
223 		p = i / 2; /* Find parent */
224 		if (q->x[p]->time <= q->x[i]->time)
225 			break; /* We have reached correct place */
226 		TASK_SWAP(p, i);
227 		i = p;
228 	}
229 	/* Heap(1,n) */
230 }
231 
232 
233 /*   Put the task_env* at index l in its right place when Heap(l+1,n) */
task_queue_siftdown(task_queue * q,int l,int n)234 static void task_queue_siftdown(task_queue *q, int l, int n)
235 {
236 	int	i = l;
237 	int	c;
238 	assert(n >= 0);
239 	/* Heap(l+1,,n) */
240 	for (; ; ) {
241 		c = 2 * i; /* First child */
242 		if (c > n)
243 			break; /* Outside heap */
244 		if (c + 1 <= n) /* We have second child */
245 			if (q->x[c+1]->time < q->x[c]->time) /* Select lesser child */
246 				c++;
247 		if (q->x[i]->time <= q->x[c]->time)
248 			break; /* We have reached correct place */
249 		TASK_SWAP(c, i);
250 		i = c;
251 	}
252 	/* Heap(l,n) */
253 }
254 
255 
256 /* Remove any element from the heap */
task_queue_remove(task_queue * q,int i)257 static task_env *task_queue_remove(task_queue *q, int i)
258 {
259 	task_env * tmp = q->x[i]; /* Will return this */
260 	assert(q->curn);
261 	/* assert(is_heap(q)); */
262 	MAY_DBG(FN; STRLIT("task_queue_remove "); NDBG(i, d));
263 	/* task_queue_debug(q); */
264 	TASK_MOVE(i, q->curn); /* Fill the hole */
265 	q->curn--; /* Heap is now smaller */
266 	/* Re-establish heap property */
267 	if (q->curn) {
268 		int	p = i / 2;
269 		if (p && q->x[p]->time > q->x[i]->time)
270 			task_queue_siftup(q, i);
271 		else
272 			task_queue_siftdown(q, i, q->curn);
273 	}
274 	/* task_queue_debug(q); */
275 	/* assert(is_heap(q)); */
276 	tmp->heap_pos = 0;
277 	return task_unref(tmp);
278 }
279 
280 
281 /* Insert task_env * in queue */
task_queue_insert(task_queue * q,task_env * t)282 static void task_queue_insert(task_queue *q, task_env *t)
283 {
284 	assert(t->heap_pos == 0);
285 	assert(q->curn < MAXTASKS);
286 	/* assert(is_heap(q)); */
287 	q->curn++;
288 	q->x[q->curn] = t;
289 	FIX_POS(q->curn);
290 	/* Heap(1,n-1) */
291 	task_queue_siftup(q, q->curn);
292 	/* Heap(1,n) */
293 	/* assert(is_heap(q)); */
294 }
295 
296 
task_queue_empty(task_queue * q)297 static int	task_queue_empty(task_queue *q)
298 {
299 	/* assert(is_heap(q)); */
300 	return q->curn < 1;
301 }
302 
303 
task_queue_min(task_queue * q)304 static task_env *task_queue_min(task_queue *q)
305 {
306 	/* assert(is_heap(q)); */
307 	assert(q->curn >= 1);
308 	return q->x[1];
309 }
310 
311 
312 /* Extract first task_env * from queue */
task_queue_extractmin(task_queue * q)313 static task_env *task_queue_extractmin(task_queue *q)
314 {
315 	task_env * tmp;
316 	assert(q);
317 	assert(q->curn >= 1);
318 	/* assert(is_heap(q)); */
319 	/* task_queue_debug(q); */
320 	tmp = q->x[1];
321 	TASK_MOVE(1, q->curn);
322 	q->x[q->curn] = 0;
323 	q->curn--;
324 	/* Heap(2,n) */
325 	if (q->curn)
326 		task_queue_siftdown(q, 1, q->curn);
327 	/* Heap(1,n) */
328 	/* task_queue_debug(q); */
329 	/* assert(is_heap(q)); */
330 	tmp->heap_pos = 0;
331 	return tmp;
332 }
333 
334 
335 static linkage ash_nazg_gimbatul = {0,&ash_nazg_gimbatul,&ash_nazg_gimbatul}; /* One ring to bind them all */
336 
337 static void task_init(task_env *p);
338 void *task_allocate(task_env *p, unsigned int bytes);
339 /**
340    Initialize task memory
341 */
task_init(task_env * t)342 static void task_init(task_env *t)
343 {
344 	link_init(&t->l, type_hash("task_env"));
345 	link_init(&t->all, type_hash("task_env"));
346 	t->heap_pos = 0;
347 	/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
348 	/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
349 	assert(ash_nazg_gimbatul.type == type_hash("task_env"));
350 	/* #ifdef __sun */
351 	/*   mem_watch(&ash_nazg_gimbatul,sizeof(&ash_nazg_gimbatul), 0); */
352 	/* #endif */
353 	link_into(&t->all, &ash_nazg_gimbatul); /* Put it in the list of all tasks */
354 	/* #ifdef __sun */
355 	/*   mem_watch(&ash_nazg_gimbatul,sizeof(&ash_nazg_gimbatul), WA_WRITE); */
356 	/* #endif */
357 	assert(ash_nazg_gimbatul.type == type_hash("task_env"));
358 	/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
359 	/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
360 	t->terminate = RUN;
361 	t->refcnt = 0;
362 	t->taskret = 0;
363 	t->time = 0.0;
364 	t->arg = null_arg;
365 	t->where = t->buf;
366 	t->stack_top = &t->buf[TASK_POOL_ELEMS-1];
367 	t->sp = t->stack_top;
368 	memset(t->buf, 0, TASK_POOL_ELEMS * sizeof(TaskAlign));
369 }
370 
371 
372 static linkage tasks = {0,&tasks,&tasks};
373 static task_queue task_time_q;
374 static linkage free_tasks = {0,&free_tasks,&free_tasks};
375 /* Basic operations on tasks */
activate(task_env * t)376 static task_env *activate(task_env *t)
377 {
378 	if (t) {
379 		MAY_DBG(FN;
380 		    STRLIT("activating task ");
381 		    PTREXP(t);
382 		    STREXP(t->name);
383 		    NDBG(t->heap_pos, d);
384 		    NDBG(t->time, f);
385 		    );
386 		assert(ash_nazg_gimbatul.type == type_hash("task_env"));
387 		if (t->heap_pos)
388 			task_queue_remove(&task_time_q, t->heap_pos);
389 		link_into(&t->l, &tasks);
390 		t->time = 0.0;
391 		t->heap_pos = 0;
392 		assert(ash_nazg_gimbatul.type == type_hash("task_env"));
393 	}
394 	return t;
395 }
396 
397 
deactivate(task_env * t)398 static task_env *deactivate(task_env *t)
399 {
400 	if (t) {
401 		assert(ash_nazg_gimbatul.type == type_hash("task_env"));
402 		link_out(&t->l);
403 		assert(ash_nazg_gimbatul.type == type_hash("task_env"));
404 	}
405 	return t;
406 }
407 
408 
task_delay_until(double time)409 void task_delay_until(double time)
410 {
411 	if (stack) {
412 		stack->time = time;
413 		task_queue_insert(&task_time_q, task_ref(deactivate(stack)));
414 	}
415 }
416 
417 
418 /* Wait queues */
task_wait(task_env * t,linkage * queue)419 void task_wait(task_env *t, linkage *queue)
420 {
421 	if (t) {
422 		TASK_DEBUG("task_wait");
423 		deactivate(t);
424 		link_into(&t->l, queue);
425 	}
426 }
427 
428 
task_wakeup(linkage * queue)429 void task_wakeup(linkage *queue)
430 {
431 	assert(queue);
432 	assert(queue != &tasks);
433 	while (!link_empty(queue)) {
434 		activate(container_of(link_extract_first(queue), task_env, l));
435 		TASK_DEBUG("task_wakeup");
436 	}
437 }
438 
439 
task_wakeup_first(linkage * queue)440 static void task_wakeup_first(linkage *queue)
441 {
442 	assert(queue);
443 	assert(queue != &tasks);
444 	if (!link_empty(queue)) {
445 		activate(container_of(link_extract_first(queue), task_env, l));
446 		TASK_DEBUG("task_wakeup_first");
447 	}
448 }
449 
450 
451 /* Channels */
channel_init(channel * c,unsigned int type)452 channel *channel_init(channel *c, unsigned int type)
453 {
454 	link_init(&c->data, type);
455 	link_init(&c->queue, type_hash("task_env"));
456 	return c;
457 }
458 
459 /* purecov: begin deadcode */
channel_new()460 channel *channel_new()
461 {
462 	channel * c = malloc(sizeof(channel));
463 	channel_init(c, NULL_TYPE);
464 	return c;
465 }
466 /* purecov: end */
467 
channel_put(channel * c,linkage * data)468 void channel_put(channel *c, linkage *data)
469 {
470 	MAY_DBG(FN; PTREXP(data); PTREXP(&c->data));
471 	link_into(data, &c->data);
472 	task_wakeup_first(&c->queue);
473 }
474 
475 
channel_put_front(channel * c,linkage * data)476 void channel_put_front(channel *c, linkage *data)
477 {
478 	link_follow(data, &c->data);
479 	task_wakeup_first(&c->queue);
480 }
481 
482 
483 static int	active_tasks = 0;
task_new(task_func func,task_arg arg,const char * name,int debug)484 task_env *task_new(task_func func, task_arg arg, const char *name, int debug)
485 {
486 	task_env * t;
487 	if (link_empty(&free_tasks))
488 		t = malloc(sizeof(task_env));
489 	else
490 		t = container_of(link_extract_first(&free_tasks), task_env, l);
491 	DBGOUT(FN; PTREXP(t); STREXP(name));
492 	task_init(t);
493 	t->func = func;
494 	t->arg = arg;
495 	t->name = name;
496 	t->debug = debug;
497 	t->waitfd = -1;
498 	t->interrupt = 0;
499 	activate(t);
500 	task_ref(t);
501 	active_tasks++;
502 	return t;
503 }
504 
505 
506 /**Allocate bytes from pool, initialized to zero */
task_allocate(task_env * p,unsigned int bytes)507 void *task_allocate(task_env *p, unsigned int bytes)
508 {
509 	/*  TaskAlign to boundary  */
510 	unsigned int	alloc_units = (unsigned int)
511 		((bytes + sizeof(TaskAlign) -1) / sizeof(TaskAlign));
512 	TaskAlign * ret;
513 	/*  Check if there is space */
514 	TASK_DEBUG("task_allocate");
515 	if ((p->where + alloc_units) <= (p->stack_top)) {
516 		ret = p->where;
517 		p->where += alloc_units;
518 		memset(ret, 0, alloc_units * sizeof(TaskAlign));
519 	} else {
520 		ret = 0;
521 		abort();
522 	}
523 	return ret;
524 }
525 
526 
reset_state(task_env * p)527 void reset_state(task_env *p)
528 {
529 	if ((p->where) <= (p->stack_top - 1)) {
530 		TASK_DEBUG("reset_state");
531 		p->stack_top[-1].state = 0;
532 	} else {
533 		abort();
534 	}
535 }
536 
537 
pushp(task_env * p,void * ptr)538 void pushp(task_env *p, void *ptr)
539 {
540 	assert(ptr);
541 	if ((p->where) <= (p->stack_top - 1)) {
542 		p->stack_top->ptr = ptr;
543 		p->stack_top--;
544 		TASK_DEBUG("pushp");
545 	} else {
546 		abort();
547 	}
548 }
549 
550 
popp(task_env * p)551 void popp(task_env *p)
552 {
553 	if (p->stack_top < &p->buf[TASK_POOL_ELEMS]) {
554 		TASK_DEBUG("popp");
555 		p->stack_top++;
556 	} else {
557 		abort();
558 	}
559 }
560 
561 
runnable_tasks()562 static int	runnable_tasks()
563 {
564 	return !link_empty(&tasks);
565 }
566 
567 
delayed_tasks()568 static int	delayed_tasks()
569 {
570 	return !task_queue_empty(&task_time_q);
571 }
572 
573 
task_delete(task_env * t)574 static void task_delete(task_env *t)
575 {
576 	DBGOUT(FN; PTREXP(t); STREXP(t->name); NDBG(t->refcnt, d));
577 	link_out(&t->all); /* Remove task from list of all tasks */
578 #if 1
579 	free(deactivate(t)); /* Deactivate and free task */
580 #else
581   deactivate(t);
582 	link_into(&t->l, &free_tasks);
583 #endif
584 	active_tasks--;
585 }
586 
587 
task_ref(task_env * t)588 static task_env *task_ref(task_env *t)
589 {
590 	if (t) {
591 		t->refcnt++;
592 	}
593 	return t;
594 }
595 
596 
task_unref(task_env * t)597 static task_env *task_unref(task_env *t)
598 {
599 	if (t) {
600 		t->refcnt--;
601 		if (t->refcnt == 0) {
602 			task_delete(t);
603 			return NULL;
604 		}
605 	}
606 	return t;
607 }
608 
609 
task_activate(task_env * t)610 task_env *task_activate(task_env *t)
611 {
612 	return activate(t);
613 }
614 
615 
task_deactivate(task_env * t)616 task_env *task_deactivate(task_env *t)
617 {
618 	return deactivate(t);
619 }
620 
621 
622 /* Set terminate flag and activate task */
task_terminate(task_env * t)623 task_env *task_terminate(task_env *t) {
624   if (t) {
625     DBGOUT(FN; PTREXP(t); STREXP(t->name); NDBG(t->refcnt, d));
626     t->terminate = KILL; /* Set terminate flag */
627     activate(t);         /* and get it running */
628   }
629   return t;
630 }
631 
632 
633 /* Call task_terminate on all tasks */
task_terminate_all()634 void task_terminate_all()
635 {
636 	/* First, activate all tasks which wait for timeout */
637 	while (delayed_tasks()) {
638 		task_env * t = extract_first_delayed(); /* May be NULL */
639 		if (t)
640 			activate(t); /* Make it runnable */
641 	}
642 	/* Then wake all tasks waiting for IO */
643 	wake_all_io();
644 	/* At last, terminate everything */
645 	/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
646 	/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
647 	FWD_ITER(&ash_nazg_gimbatul, task_env,
648 	    task_terminate(container_of(link_iter, task_env, all));
649 	    );
650 }
651 
652 
first_delayed()653 static task_env *first_delayed()
654 {
655 	return task_queue_min(&task_time_q);
656 }
657 
658 
extract_first_delayed()659 static task_env *extract_first_delayed()
660 {
661 	task_env * ret = task_queue_extractmin(&task_time_q);
662 	ret->time = 0.0;
663 	return task_unref(ret);
664 }
665 
666 
667 static iotasks iot;
668 
iotasks_init(iotasks * iot)669 static void iotasks_init(iotasks *iot)
670  {
671   DBGOUT(FN);
672   iot->nwait = 0;
673   init_pollfd_array(&iot->fd);
674   init_task_env_p_array(&iot->tasks);
675  }
676 
iotasks_deinit(iotasks * iot)677 static void iotasks_deinit(iotasks *iot)
678 {
679   DBGOUT(FN);
680   iot->nwait = 0;
681   free_pollfd_array(&iot->fd);
682   free_task_env_p_array(&iot->tasks);
683 }
684 
685 #if TASK_DBUG_ON
poll_debug()686 static void poll_debug()
687 {
688 	int	i = 0;
689 	for (i = 0; i < iot.nwait; i++) {
690 		NDBG(i,d); PTREXP(iot.tasks[i]); NDBG(iot.fd[i].fd,d); NDBG(iot.fd[i].events,d); NDBG(iot.fd[i].revents,d);
691 	}
692 }
693 #endif
694 
poll_wakeup(int i)695 static void poll_wakeup(int i)
696 {
697   activate(task_unref(get_task_env_p(&iot.tasks,i)));
698   set_task_env_p(&iot.tasks, NULL,i);
699   iot.nwait--; /* Shrink array of pollfds */
700   set_pollfd(&iot.fd, get_pollfd(&iot.fd,iot.nwait),i);
701   set_task_env_p(&iot.tasks, get_task_env_p(&iot.tasks,iot.nwait),i);
702 }
703 
poll_wait(int ms)704 static int poll_wait(int ms) {
705   result nfds = {0, 0};
706   int wake = 0;
707 
708   /* Wait at most ms milliseconds */
709   MAY_DBG(FN; NDBG(ms, d));
710   if (ms < 0 || ms > 1000) ms = 1000; /* Wait at most 1000 ms */
711   SET_OS_ERR(0);
712   while ((nfds.val = poll(iot.fd.pollfd_array_val, iot.nwait, ms)) == -1) {
713     nfds.funerr = to_errno(GET_OS_ERR);
714     if (nfds.funerr != SOCK_EINTR) {
715       task_dump_err(nfds.funerr);
716       MAY_DBG(FN; STRLIT("poll failed"));
717       abort();
718     }
719     SET_OS_ERR(0);
720   }
721   /* Wake up ready tasks */
722   {
723     int i = 0;
724     int interrupt = 0;
725     while (i < iot.nwait) {
726       interrupt =
727         (get_task_env_p(&iot.tasks,i)->time != 0.0 &&
728         get_task_env_p(&iot.tasks,i)->time < task_now());
729       if (interrupt || /* timeout ? */
730         get_pollfd(&iot.fd,i).revents) {
731         /* if(iot.fd[i].revents & POLLERR) abort(); */
732         get_task_env_p(&iot.tasks,i)->interrupt = interrupt;
733         poll_wakeup(i);
734         wake = 1;
735       } else {
736         i++;
737       }
738     }
739   }
740   return wake;
741 }
742 
add_fd(task_env * t,int fd,int op)743 static void add_fd(task_env *t, int fd, int op) {
744   int events = 'r' == op ? POLLIN | POLLRDNORM : POLLOUT;
745   MAY_DBG(FN; PTREXP(t); NDBG(fd, d); NDBG(op, d));
746   assert(fd >= 0);
747   t->waitfd = fd;
748   deactivate(t);
749   task_ref(t);
750   set_task_env_p(&iot.tasks, t, iot.nwait);
751   {
752     pollfd x;
753     x.fd = fd;
754     x.events = events;
755     x.revents = 0;
756     set_pollfd(&iot.fd, x, iot.nwait);
757   }
758   iot.nwait++;
759 }
760 
unpoll(int i)761 void unpoll(int i) {
762   task_unref(get_task_env_p(&iot.tasks, i));
763   set_task_env_p(&iot.tasks, NULL,i);
764   {
765     pollfd x;
766     x.fd = -1;
767     x.events = 0;
768     x.revents = 0;
769     set_pollfd(&iot.fd, x, i);
770   }
771 }
772 
wake_all_io()773 static void wake_all_io() {
774   int i;
775   for (i = 0; i < iot.nwait; i++) {
776     activate(get_task_env_p(&iot.tasks,i));
777     unpoll(i);
778   }
779   iot.nwait = 0;
780 }
781 
remove_and_wakeup(int fd)782 void remove_and_wakeup(int fd) {
783   int i = 0;
784   MAY_DBG(FN; NDBG(fd, d));
785   while (i < iot.nwait) {
786     if (get_pollfd(&iot.fd,i).fd == fd) {
787       poll_wakeup(i);
788     } else {
789       i++;
790     }
791   }
792 }
793 
794 task_env *stack = NULL;
795 
wait_io(task_env * t,int fd,int op)796 task_env *wait_io(task_env *t, int fd, int op)
797 {
798 	t->time = 0.0;
799 	t->interrupt = 0;
800 	add_fd(deactivate(t), fd, op);
801 	return t;
802 }
803 
804 
timed_wait_io(task_env * t,int fd,int op,double timeout)805 static task_env *timed_wait_io(task_env *t, int fd, int op, double timeout)
806 {
807 	t->time = task_now() + timeout;
808 	t->interrupt = 0;
809 	add_fd(deactivate(t), fd, op);
810 	return t;
811 }
812 
813 
814 static uint64_t send_count;
815 static uint64_t receive_count;
816 static uint64_t send_bytes;
817 static uint64_t receive_bytes;
818 
819 #ifdef XCOM_HAVE_OPENSSL
con_read(connection_descriptor const * rfd,void * buf,int n)820 result	con_read(connection_descriptor const *rfd, void *buf, int n)
821 {
822   result	ret = {0,0};
823 
824   if (rfd->ssl_fd) {
825     ERR_clear_error();
826     ret.val = SSL_read(rfd->ssl_fd, buf, n);
827     ret.funerr = to_ssl_err(SSL_get_error(rfd->ssl_fd, ret.val));
828   } else {
829     SET_OS_ERR(0);
830     ret.val = (int)recv(rfd->fd, buf, (size_t)n, 0);
831     ret.funerr = to_errno(GET_OS_ERR);
832   }
833   return ret;
834 }
835 #else
con_read(connection_descriptor const * rfd,void * buf,int n)836 result	con_read(connection_descriptor const *rfd, void *buf, int n)
837 {
838   result	ret = {0,0};
839 
840   SET_OS_ERR(0);
841   ret.val = recv(rfd->fd, buf, (size_t)n, 0);
842   ret.funerr = to_errno(GET_OS_ERR);
843 
844   return ret;
845 }
846 #endif
847 
848 /*
849   It just reads no more than INT_MAX bytes. Caller should call it again for
850   read more than INT_MAX bytes.
851 
852   Either the bytes written or an error number is returned to the caller through
853   'ret' argument. Error number is always negative integers.
854 */
task_read(connection_descriptor const * con,void * buf,int n,int64_t * ret)855 int	task_read(connection_descriptor const* con, void *buf, int n, int64_t *ret)
856 {
857 	DECL_ENV
858 		int dummy;
859 	END_ENV;
860 
861 	result 	sock_ret = {0,0};
862         *ret= 0;
863 
864 	assert(n >= 0);
865 
866 	TASK_BEGIN
867 		MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
868 
869 	for(;;){
870 		if(con->fd <= 0)
871 			TASK_FAIL;
872 		sock_ret = con_read(con, buf, n);
873 		*ret = sock_ret.val;
874 		task_dump_err(sock_ret.funerr);
875 		MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
876 		if(sock_ret.val >= 0 || (! can_retry_read(sock_ret.funerr)))
877 			break;
878 		wait_io(stack, con->fd, 'r');
879 		TASK_YIELD;
880 		MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); PTREXP(buf); NDBG(n,d));
881 	}
882 
883 	assert(!can_retry_read(sock_ret.funerr));
884 	FINALLY
885 	    receive_count++;
886 	if(*ret > 0)
887 		receive_bytes += (uint64_t)(*ret);
888 	TASK_END;
889 }
890 
891 #ifdef XCOM_HAVE_OPENSSL
con_write(connection_descriptor const * wfd,void * buf,int n)892 result	con_write(connection_descriptor const *wfd, void *buf, int n)
893 {
894   result	ret = {0,0};
895 
896   assert(n >0);
897 
898   if (wfd->ssl_fd) {
899     ERR_clear_error();
900     ret.val = SSL_write(wfd->ssl_fd, buf, n);
901     ret.funerr = to_ssl_err(SSL_get_error(wfd->ssl_fd, ret.val));
902   } else {
903     SET_OS_ERR(0);
904     ret.val = (int)send(wfd->fd, buf, (size_t)n, 0);
905     ret.funerr = to_errno(GET_OS_ERR);
906   }
907   return ret;
908 }
909 #else
con_write(connection_descriptor const * wfd,void * buf,int n)910 result	con_write(connection_descriptor const *wfd, void *buf, int n)
911 {
912   result	ret = {0,0};
913 
914   assert(n >0);
915 
916   SET_OS_ERR(0);
917   ret.val = send(wfd->fd, buf, (size_t)n, 0);
918   ret.funerr = to_errno(GET_OS_ERR);
919   return ret;
920 }
921 #endif
922 
923 /*
924   It writes no more than UINT_MAX bytes which is the biggest size of
925   paxos message.
926 
927   Either the bytes written or an error number is returned to the caller through
928   'ret' argument. Error number is always negative integers.
929 */
task_write(connection_descriptor const * con,void * _buf,uint32_t n,int64_t * ret)930 int task_write(connection_descriptor const *con, void *_buf, uint32_t n,
931                int64_t *ret)
932 {
933 	char	*buf = (char *) _buf;
934 	DECL_ENV
935 	   uint32_t	total; /* Keeps track of number of bytes written so far */
936 	END_ENV;
937 	result 	sock_ret = {0,0};
938 
939 	TASK_BEGIN
940 	    ep->total = 0;
941 	*ret= 0;
942 	while (ep->total < n) {
943 		MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(n - ep->total,u));
944 		for(;;){
945 			if(con->fd <= 0)
946 				TASK_FAIL;
947 			/*
948 			  con_write can only write messages that their sizes don't exceed
949 			  INT_MAX bytes. We should never pass a length bigger than INT_MAX
950 			  to con_write.
951 			*/
952 			sock_ret = con_write(con, buf + ep->total, n-ep->total >= INT_MAX ?
953                            INT_MAX : (int)(n-ep->total));
954 			task_dump_err(sock_ret.funerr);
955 			MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(sock_ret.val, d));
956 			if(sock_ret.val >= 0 || (! can_retry_write(sock_ret.funerr)))
957 				break;
958 			wait_io(stack, con->fd, 'w');
959 			MAY_DBG(FN; PTREXP(stack); NDBG(con->fd,d); NDBG(n - ep->total,u));
960 			TASK_YIELD;
961 		}
962 		if (0 == sock_ret.val) { /* We have successfully written n bytes */
963 			TERMINATE;
964 		} else if (sock_ret.val < 0) { /* Something went wrong */
965 			TASK_FAIL;
966 		} else {
967 			/* Add number of bytes written to total */
968 			ep->total += (uint32_t)sock_ret.val;
969 		}
970 	}
971 	assert(ep->total == n);
972 	TASK_RETURN(ep->total);
973 	FINALLY
974 	    send_count++;
975 	send_bytes += ep->total;
976 	TASK_END;
977 }
978 
979 
unblock_fd(int fd)980 int unblock_fd(int fd)
981 {
982 #if !defined(WIN32) && !defined(WIN64)
983 	int	x = fcntl(fd, F_GETFL, 0);
984 	x = fcntl(fd, F_SETFL, x | O_NONBLOCK);
985 #else
986 	/**
987 		   * On windows we toggle the FIONBIO flag directly
988 		   *
989 		   * Undocumented in MSDN:
990 		   * Calling ioctlsocket(FIONBIO) to an already set state
991 		   * seems to return -1 and WSAGetLastError() == 0.
992 		   */
993 	u_long nonblocking = 1; /** !0 == non-blocking */
994 	int	x = ioctlsocket(fd, FIONBIO, &nonblocking);
995 #endif
996   return x;
997 }
998 
999 
block_fd(int fd)1000 int block_fd(int fd)
1001 {
1002 #if !defined(WIN32) && !defined(WIN64)
1003 	int	x = fcntl(fd, F_GETFL, 0);
1004 	x = fcntl(fd, F_SETFL, x & ~O_NONBLOCK);
1005 #else
1006 	/**
1007 		   * On windows we toggle the FIONBIO flag directly.
1008 		   *
1009 		   * Undocumented in MSDN:
1010 		   * Calling ioctlsocket(FIONBIO) to an already set state seems to
1011 		   * return -1 and WSAGetLastError() == 0.
1012 		   */
1013 	u_long nonblocking = 0; /** 0 == blocking */
1014 	int	x = ioctlsocket(fd, FIONBIO, &nonblocking);
1015 #endif
1016   return x;
1017 }
1018 
1019 /* purecov: begin deadcode */
is_only_task()1020 int	is_only_task()
1021 {
1022 	return link_first(&tasks) == link_last(&tasks);
1023 }
1024 /* purecov: end */
1025 
first_runnable()1026 static task_env *first_runnable()
1027 {
1028 	return (task_env * )link_first(&tasks);
1029 }
1030 
1031 
next_task(task_env * t)1032 static task_env *next_task(task_env *t)
1033 {
1034 	return (task_env * )link_first(&t->l);
1035 }
1036 
1037 
is_task_head(task_env * t)1038 static int	is_task_head(task_env *t)
1039 {
1040 	return & t->l == &tasks;
1041 }
1042 
1043 
msdiff(double time)1044 static int	msdiff(double time)
1045 {
1046 	return (int)(1000.5 * (first_delayed()->time - time));
1047 }
1048 
1049 
1050 static double	idle_time = 0.0;
task_loop()1051 void task_loop()
1052 {
1053 	/* While there are tasks */
1054 	for(;;) {
1055 		task_env * t = first_runnable();
1056 		/* While runnable tasks */
1057 		while (runnable_tasks()) {
1058 			task_env * next = next_task(t);
1059 			if (!is_task_head(t)) {
1060 				/* DBGOUT(FN; PTREXP(t); STRLIT(t->name ? t->name : "TASK WITH NO NAME")); */
1061 				stack = t;
1062 				assert(stack);
1063 				assert(t->terminate != TERMINATED);
1064 				if (stack->debug)
1065 					/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1066 					/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1067 					assert(ash_nazg_gimbatul.type == type_hash("task_env"));
1068 				 {
1069 					/*           double when = seconds(); */
1070 					int	val = 0;
1071 					assert(t->func);
1072 					assert(stack == t);
1073 					val = t->func(t->arg);
1074 					/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1075 					/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1076 					assert(ash_nazg_gimbatul.type == type_hash("task_env"));
1077 					if (!val) { /* Is task finished? */
1078 						deactivate(t);
1079 						t->terminate = TERMINATED;
1080 						task_unref(t);
1081 						stack = NULL;
1082 					}
1083 				}
1084 			}
1085 			t = next;
1086 		}
1087 		if (active_tasks <= 0)
1088 			break;
1089 		/* When we get here, there are no runnable tasks left.
1090 		       Wait until something happens.
1091 		    */
1092 		 {
1093 			double	time = seconds();
1094 			if (delayed_tasks()) {
1095 				int	ms = msdiff(time);
1096 				if (ms > 0) {
1097                                        if (the_app_xcom_cfg != NULL && the_app_xcom_cfg->m_poll_spin_loops)
1098                                        {
1099                                          u_int busyloop;
1100                                          for(busyloop = 0; busyloop < the_app_xcom_cfg->m_poll_spin_loops; busyloop++){
1101                                                  ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms)", 0);
1102                                                  if(poll_wait(0)) /*Just poll */
1103                                                          goto done_wait;
1104                                                  ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms) end", 0);
1105                                                  thread_yield();
1106                                           }
1107                                         }
1108 					ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms)", ms);
1109 					poll_wait(ms); /* Wait at most ms milliseconds and poll for IO */
1110 					ADD_WAIT_EV(task_now(), __FILE__, __LINE__, "poll_wait(ms) end", ms);
1111 				}
1112 done_wait:
1113 				/* While tasks with expired timers */
1114 				while (delayed_tasks() &&
1115 				    msdiff(time) <= 0) {
1116 					task_env * t = extract_first_delayed(); /* May be NULL */
1117 					if (t)
1118 						activate(t); /* Make it runnable */
1119 				}
1120 			} else {
1121 				ADD_T_EV(task_now(), __FILE__, __LINE__, "poll_wait(-1)");
1122 				poll_wait(-1); /* Wait and poll for IO */
1123 				ADD_T_EV(seconds(), __FILE__, __LINE__, "poll_wait(-1) end");
1124 				/*       } */
1125 			}
1126 			idle_time += seconds() - time;
1127 		}
1128 	}
1129   task_sys_deinit();
1130 }
1131 
1132 
init_sockaddr(char * server,struct sockaddr_in * sock_addr,socklen_t * sock_size,xcom_port port)1133 static int	init_sockaddr(char *server, struct sockaddr_in *sock_addr,
1134                           socklen_t *sock_size, xcom_port port)
1135 {
1136 	/* Get address of server */
1137 	struct addrinfo *addr = 0;
1138 
1139 	checked_getaddrinfo(server, 0, 0, &addr);
1140 
1141 	if (!addr)
1142 		return 0;
1143 
1144 	/* Copy first address */
1145 	memcpy(sock_addr, addr->ai_addr, addr->ai_addrlen);
1146 	*sock_size = addr->ai_addrlen;
1147 	sock_addr->sin_port = htons(port);
1148 
1149 	/* Clean up allocated memory by getaddrinfo */
1150 	freeaddrinfo(addr);
1151 
1152 	return 1;
1153 }
1154 
1155 
1156 #if TASK_DBUG_ON
print_sockaddr(struct sockaddr * a)1157 static void print_sockaddr(struct sockaddr *a)
1158 {
1159 	u_int	i;
1160 	GET_GOUT;
1161 	NDBG(a->sa_family,u); NDBG(a->sa_family,d);
1162 	STRLIT(" data ");
1163 	for (i = 0; i < sizeof(a->sa_data); i++) {
1164 		NPUT((unsigned char)a->sa_data[i],d);
1165 	}
1166   PRINT_GOUT;
1167   FREE_GOUT;
1168 }
1169 #endif
1170 
1171 
connect_tcp(char * server,xcom_port port,int * ret)1172 int	connect_tcp(char *server, xcom_port port, int *ret)
1173 {
1174 	DECL_ENV
1175 	    int	fd;
1176 	struct sockaddr sock_addr;
1177 	socklen_t sock_size;
1178 	END_ENV;
1179 	TASK_BEGIN;
1180 	DBGOUT(FN; STREXP(server); NDBG(port,d));
1181 	/* Create socket */
1182 	if ((ep->fd = xcom_checked_socket(AF_INET, SOCK_STREAM, 0).val) < 0) {
1183 		DBGOUT(FN; NDBG(ep->fd, d));
1184 		TASK_FAIL;
1185 	}
1186 	/* Make it non-blocking */
1187 	unblock_fd(ep->fd);
1188 	/* Get address of server */
1189 	/* OHKFIX Move this before call to xcom_checked_socket and use addrinfo fields as params to socket call */
1190 	if (!init_sockaddr(server, (struct sockaddr_in *)&ep->sock_addr,
1191 	                   &ep->sock_size, port)) {
1192 		DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1193 		TASK_FAIL;
1194 	}
1195 #if TASK_DBUG_ON
1196 	DBGOUT(FN; print_sockaddr(&ep->sock_addr));
1197 #endif
1198 	/* Connect socket to address */
1199 	 {
1200 		result sock = {0,0};
1201 		SET_OS_ERR(0);
1202 		sock.val = connect(ep->fd, &ep->sock_addr, ep->sock_size);
1203 		sock.funerr = to_errno(GET_OS_ERR);
1204 		if (sock.val < 0) {
1205 			if (hard_connect_err(sock.funerr)) {
1206 			  task_dump_err(sock.funerr);
1207 				MAY_DBG(FN;
1208 				    NDBG(ep->fd, d);
1209 				    NDBG(ep->sock_size,d));
1210 #if TASK_DBUG_ON
1211 				DBGOUT(FN; print_sockaddr(&ep->sock_addr));
1212 #endif
1213 				DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1214 				close_socket(&ep->fd);
1215 				TASK_FAIL;
1216 			}
1217 		}
1218 		/* Wait until connect has finished */
1219 retry:
1220 		timed_wait_io(stack, ep->fd, 'w', 10.0);
1221 		TASK_YIELD;
1222 		/* See if we timed out here. If we did, connect may or may not be active.
1223 		       If closing fails with EINPROGRESS, we need to retry the select.
1224 		       If close does not fail, we know that connect has indeed failed, and we
1225 		       exit from here and return -1 as socket fd */
1226 		if(stack->interrupt){
1227 			result shut = {0,0};
1228 			stack->interrupt = 0;
1229 
1230 			/* Try to close socket on timeout */
1231 			shut = shut_close_socket(&ep->fd);
1232 			DBGOUT(FN; NDBG(ep->fd, d); NDBG(ep->sock_size, d));
1233 			task_dump_err(shut.funerr);
1234 			if (from_errno(shut.funerr) == SOCK_EINPROGRESS)
1235 				goto retry; /* Connect is still active */
1236 			TASK_FAIL; /* Connect has failed */
1237 		}
1238 
1239 
1240 		{
1241 			int	peer = 0;
1242 			/* Sanity check before return */
1243 			SET_OS_ERR(0);
1244 			sock.val = peer = getpeername(ep->fd, &ep->sock_addr, &ep->sock_size);
1245 			sock.funerr = to_errno(GET_OS_ERR);
1246 			if (peer >= 0) {
1247 				TASK_RETURN(ep->fd);
1248 			} else {
1249 				/* Something is wrong */
1250 				socklen_t errlen = sizeof(sock.funerr);
1251 
1252 				getsockopt(ep->fd, SOL_SOCKET, SO_ERROR, (void * ) & sock.funerr, &errlen);
1253 				if (sock.funerr == 0) {
1254 					sock.funerr = to_errno(SOCK_ECONNREFUSED);
1255 				}
1256 
1257 				shut_close_socket(&ep->fd);
1258 				if (sock.funerr == 0)
1259 					sock.funerr = to_errno(SOCK_ECONNREFUSED);
1260 				TASK_FAIL;
1261 			}
1262 		}
1263 	}
1264 	FINALLY
1265 	    TASK_END;
1266 }
1267 
set_nodelay(int fd)1268 result set_nodelay(int fd)
1269 {
1270 	int	n = 1;
1271 	result ret = {0,0};
1272 
1273 	do{
1274 		SET_OS_ERR(0);
1275 		ret.val = setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, (char *) & n, sizeof n);
1276 		ret.funerr = to_errno(GET_OS_ERR);
1277 		DBGOUT(FN; NDBG(from_errno(ret.funerr),d));
1278 	}while(ret.val < 0 && can_retry(ret.funerr));
1279 	return ret;
1280 }
1281 
create_server_socket()1282 static result	create_server_socket()
1283 {
1284 	result	fd ={0,0};
1285 	/* Create socket */
1286 	if ((fd = xcom_checked_socket(PF_INET, SOCK_STREAM, 0)).val < 0) {
1287                 G_MESSAGE("Unable to create socket "
1288                           "(socket=%d, errno=%d)!",
1289                           fd.val, to_errno(GET_OS_ERR));
1290 		return fd;
1291 	}
1292 	{
1293 		int	reuse = 1;
1294 		SET_OS_ERR(0);
1295 		if (setsockopt(fd.val, SOL_SOCKET, SOCK_OPT_REUSEADDR, (void * ) & reuse, sizeof(reuse)) < 0) {
1296 			fd.funerr = to_errno(GET_OS_ERR);
1297                         G_MESSAGE("Unable to set socket options "
1298                                   "(socket=%d, errno=%d)!",
1299                                   fd.val, to_errno(GET_OS_ERR));
1300 			close_socket(&fd.val);
1301 			return fd;
1302 		}
1303 	}
1304 	return fd;
1305 }
1306 
1307 
init_server_addr(struct sockaddr_in * sock_addr,xcom_port port)1308 static void init_server_addr(struct sockaddr_in *sock_addr, xcom_port port)
1309 {
1310 	memset(sock_addr, 0, sizeof(*sock_addr));
1311 	sock_addr->sin_family = PF_INET;
1312 	sock_addr->sin_port = htons(port);
1313 }
1314 
1315 
announce_tcp(xcom_port port)1316 result	announce_tcp(xcom_port port)
1317 {
1318 	result	fd;
1319 	struct sockaddr_in sock_addr;
1320 
1321 	fd = create_server_socket();
1322 	if (fd.val < 0) {
1323 		return fd;
1324 	}
1325 	init_server_addr(&sock_addr, port);
1326 	if (bind(fd.val, (struct sockaddr *)&sock_addr, sizeof(sock_addr)) < 0) {
1327           int err = to_errno(GET_OS_ERR);
1328           G_MESSAGE("Unable to bind to %s:%d (socket=%d, errno=%d)!",
1329                     "0.0.0.0", port, fd.val, err);
1330 		goto err;
1331 	}
1332         G_DEBUG("Successfully bound to %s:%d (socket=%d).",
1333                   "0.0.0.0", port, fd.val);
1334 	if (listen(fd.val, 32) < 0) {
1335           int err = to_errno(GET_OS_ERR);
1336           G_MESSAGE("Unable to listen backlog to 32. "
1337                     "(socket=%d, errno=%d)!", fd.val, err);
1338 		goto err;
1339 	}
1340         G_DEBUG("Successfully set listen backlog to 32 "
1341                   "(socket=%d)!", fd.val);
1342 	/* Make socket non-blocking */
1343 	unblock_fd(fd.val);
1344         if (fd.val < 0)
1345         {
1346           int err = to_errno(GET_OS_ERR);
1347           G_MESSAGE("Unable to unblock socket (socket=%d, errno=%d)!",
1348                     fd.val, err);
1349         }
1350         else
1351         {
1352           G_DEBUG("Successfully unblocked socket (socket=%d)!", fd.val);
1353         }
1354 	return fd;
1355 
1356  err:
1357 	fd.funerr = to_errno(GET_OS_ERR);
1358 	task_dump_err(fd.funerr);
1359 	close_socket(&fd.val);
1360 	return fd;
1361 }
1362 
1363 
accept_tcp(int fd,int * ret)1364 int	accept_tcp(int fd, int *ret)
1365 {
1366 	struct sockaddr sock_addr;
1367 	DECL_ENV
1368 	    int	connection;
1369 	END_ENV;
1370 	TASK_BEGIN;
1371 	/* Wait for connection attempt */
1372 	wait_io(stack, fd, 'r');
1373 	TASK_YIELD;
1374 	/* Spin on benign error code */
1375 	{
1376 		socklen_t size = sizeof sock_addr;
1377 		result res = {0,0};
1378 		do{
1379 			SET_OS_ERR(0);
1380 			res.val = ep->connection = accept(fd, (void * ) & sock_addr, &size);
1381 			res.funerr = to_errno(GET_OS_ERR);
1382 		}while(res.val < 0 && from_errno(res.funerr) == SOCK_EINTR);
1383 
1384 		if (ep->connection < 0) {
1385 			TASK_FAIL;
1386 		}
1387 	}
1388 #if TASK_DBUG_ON
1389 	DBGOUT(FN; print_sockaddr(&sock_addr));
1390 #endif
1391 	TASK_RETURN(ep->connection);
1392 	FINALLY
1393 	    TASK_END;
1394 }
1395 
1396 
1397 #define STAT_INTERVAL 1.0
1398 
1399 #if 0
1400 
1401 /*
1402   This was disabled to prevent unecessary build warnings.
1403 
1404   TODO:
1405   Needs to be assessed whether it should be removed altogether.
1406  */
1407 
1408 static int	statistics_task(task_arg arg)
1409 {
1410 	DECL_ENV
1411 	    double	next;
1412 	END_ENV;
1413 	TASK_BEGIN
1414 	    idle_time = 0.0;
1415 	send_count = 0;
1416 	receive_count = 0;
1417 	send_bytes = 0;
1418 	receive_bytes = 0;
1419 	ep->next = seconds() + STAT_INTERVAL;
1420 	TASK_DELAY_UNTIL(ep->next);
1421 	for(;;) {
1422 		G_DEBUG("task system idle %f send/s %f receive/s %f send b/s %f receive b/s %f",
1423 		    (idle_time / STAT_INTERVAL) * 100.0, send_count / STAT_INTERVAL, receive_count / STAT_INTERVAL,
1424 		    send_bytes / STAT_INTERVAL, receive_bytes / STAT_INTERVAL);
1425 		idle_time = 0.0;
1426 		send_count = 0;
1427 		receive_count = 0;
1428 		send_bytes = 0;
1429 		receive_bytes = 0;
1430 		ep->next += STAT_INTERVAL;
1431 		TASK_DELAY_UNTIL(ep->next);
1432 	}
1433 	FINALLY
1434 	    TASK_END;
1435 }
1436 #endif
1437 
init_task_vars()1438 static void init_task_vars()
1439 {
1440 	stack = 0;
1441 	task_errno = 0;
1442 }
1443 
task_sys_init()1444 void task_sys_init()
1445 {
1446 	DBGOUT(FN; NDBG(FD_SETSIZE,d));
1447 	init_task_vars();
1448 	link_init(&tasks, type_hash("task_env"));
1449 	link_init(&free_tasks, type_hash("task_env"));
1450 	link_init(&ash_nazg_gimbatul, type_hash("task_env"));
1451 	/* assert(ash_nazg_gimbatul.suc > (linkage*)0x8000000); */
1452 	/* assert(ash_nazg_gimbatul.pred > (linkage*)0x8000000); */
1453 	iotasks_init(&iot);
1454 	seconds(); /* Needed to initialize _now */
1455 	/* task_new(statistics_task, null_arg, "statistics_task", 1);  */
1456 }
1457 
1458 
task_sys_deinit()1459 static void task_sys_deinit()
1460 {
1461   DBGOUT(FN);
1462   iotasks_deinit(&iot);
1463 }
1464 
1465 /* purecov: begin deadcode */
is_running(task_env * t)1466 int	is_running(task_env *t)
1467 {
1468 	return t && t->terminate == RUN;
1469 }
1470 /* purecov: end */
1471 
set_task(task_env ** p,task_env * t)1472 void set_task(task_env**p, task_env *t)
1473 {
1474 	if (t)
1475 		task_ref(t);
1476 	if (*p)
1477 		task_unref(*p);
1478 	*p = t;
1479 }
1480 
1481 
task_name()1482 const char	*task_name()
1483 {
1484 	return stack ? stack->name : "idle";
1485 }
1486 
1487 
1488 task_event task_events[MAX_TASK_EVENT];
1489 int cur_task_event;
1490 int max_task_event;
1491 
1492 #ifdef WIN
1493 #define snprintf(...) _snprintf(__VA_ARGS__)
1494 #endif
1495 
1496 /* purecov: begin deadcode */
ev_print(task_event te)1497 void	ev_print(task_event te)
1498 {
1499 	enum {
1500 		bufsize = 10000
1501 	};
1502 	static char	buf[bufsize];
1503 	static size_t pos = 0;
1504 
1505 	if (te.pad) {
1506 		switch (te.arg.type) {
1507 		case a_int:
1508 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%d ", te.arg.val.i);
1509 			break;
1510 		case a_long:
1511 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%ld ", te.arg.val.l);
1512 			break;
1513 		case a_uint:
1514 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%u ", te.arg.val.u_i);
1515 			break;
1516 		case a_ulong:
1517 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%lu ", te.arg.val.u_l);
1518 			break;
1519 		case a_ulong_long:
1520 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%llu ", te.arg.val.u_ll);
1521 			break;
1522 		case a_float:
1523 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f ", te.arg.val.f);
1524 			break;
1525 		case a_double:
1526 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f ", te.arg.val.d);
1527 			break;
1528 		case a_void:
1529 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%p ", te.arg.val.v);
1530 			break;
1531 		case a_string:
1532 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%s ", te.arg.val.s);
1533 			break;
1534 		case a_end:
1535 			xcom_log(LOG_TRACE, buf);
1536 			pos = 0;
1537 			break;
1538 		default:
1539 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "??? ");
1540 		}
1541 	} else {
1542 		switch (te.arg.type) {
1543 		case a_int:
1544 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%d", te.arg.val.i);
1545 			break;
1546 		case a_long:
1547 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%ld", te.arg.val.l);
1548 			break;
1549 		case a_uint:
1550 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%u", te.arg.val.u_i);
1551 			break;
1552 		case a_ulong:
1553 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%lu", te.arg.val.u_l);
1554 			break;
1555 		case a_ulong_long:
1556 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%llu", te.arg.val.u_ll);
1557 			break;
1558 		case a_float:
1559 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f", te.arg.val.f);
1560 			break;
1561 		case a_double:
1562 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%f", te.arg.val.d);
1563 			break;
1564 		case a_void:
1565 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%p", te.arg.val.v);
1566 			break;
1567 		case a_string:
1568 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "%s", te.arg.val.s);
1569 			break;
1570 		case a_end:
1571 			xcom_log(LOG_TRACE, buf);
1572 			pos = 0;
1573 			break;
1574 		default:
1575 			pos += (size_t)snprintf(&buf[pos], bufsize - pos, "???");
1576 		}
1577 	}
1578 	buf[pos] = 0;
1579 }
1580 
1581 
add_event(task_arg te)1582 void add_event(task_arg te)
1583 {
1584 	task_events[cur_task_event].arg = te;
1585 	task_events[cur_task_event].pad = 1;
1586 
1587 	cur_task_event++;
1588 	if (cur_task_event > max_task_event)
1589 		max_task_event = cur_task_event;
1590 	cur_task_event %= MAX_TASK_EVENT;
1591 }
1592 
1593 
add_unpad_event(task_arg te)1594 void add_unpad_event(task_arg te)
1595 {
1596 	task_events[cur_task_event].arg = te;
1597 	task_events[cur_task_event].pad = 0;
1598 
1599 	cur_task_event++;
1600 	if (cur_task_event > max_task_event)
1601 		max_task_event = cur_task_event;
1602 	cur_task_event %= MAX_TASK_EVENT;
1603 }
1604 
1605 
add_base_event(double when,char const * file,int state)1606 void add_base_event(double when, char const *file, int state)
1607 {
1608 	static double	t = 0.0;
1609 
1610 	add_event(double_arg(when));
1611 	add_event(double_arg(when - t));
1612 	t = when;
1613 	add_unpad_event(string_arg(file));
1614 	add_unpad_event(string_arg(":"));
1615 	add_event(int_arg(state));
1616 }
1617 
1618 
add_task_event(double when,char const * file,int state,char const * what)1619 void add_task_event(double when, char const *file, int state, char const *what)
1620 {
1621 	add_base_event(when, file, state);
1622 	add_event(string_arg(what));
1623 	add_event(end_arg());
1624 }
1625 
1626 
add_wait_event(double when,char * file,int state,char * what,int milli)1627 void add_wait_event(double when, char *file, int state, char *what, int milli)
1628 {
1629 	add_base_event(when, file, state);
1630 	add_event(string_arg(what));
1631 
1632 	add_event(string_arg("milli"));
1633 	add_event(int_arg(milli));
1634 	add_event(end_arg());
1635 }
1636 
1637 
dump_range(double t MY_ATTRIBUTE ((unused)),int start,int end)1638 static void dump_range(double t MY_ATTRIBUTE((unused)
1639 ), int start, int end)
1640 {
1641 	int	i;
1642 	for (i = start; i < end; i++) {
1643 		ev_print(task_events[i]);
1644 	}
1645 }
1646 
1647 
dump_task_events()1648 void dump_task_events()
1649 {
1650 	double	t = 0.0;
1651 	G_DEBUG("cur_task_event %d max_task_event %d", cur_task_event, max_task_event);
1652 	add_event(end_arg());
1653 	dump_range(t, cur_task_event, max_task_event);
1654 	dump_range(t, 0, cur_task_event);
1655 }
1656 /* purecov: end */
1657