1 #include <time.h>
2 #include <sys/time.h>
3 #include <stdlib.h>
4 #include <signal.h>
5 #include <pthread.h>
6 #include <inttypes.h>
7 // #include <stdio.h>
8 
9 #include "massert.h"
10 #include "clocks.h"
11 
12 typedef struct _heapelem {
13 	void (*fn)(void *);
14 	void *udata;
15 	uint64_t firetime;
16 } heapelem;
17 
18 static heapelem *heap;
19 static uint32_t heapelements;
20 static uint32_t heapsize;
21 static uint8_t exitflag;
22 static uint8_t waiting;
23 static pthread_mutex_t dlock;
24 static pthread_cond_t dcond;
25 static pthread_t delay_th;
26 
27 #define PARENT(x) (((x)-1)/2)
28 #define CHILD(x) (((x)*2)+1)
29 
delay_heap_sort_down(void)30 void delay_heap_sort_down(void) {
31 	uint32_t l,r,m;
32 	uint32_t pos = 0;
33 	heapelem x;
34 	while (pos<heapelements) {
35 		l = CHILD(pos);
36 		r = l+1;
37 		if (l>=heapelements) {
38 			return;
39 		}
40 		m = l;
41 		if (r<heapelements && heap[r].firetime < heap[l].firetime) {
42 			m = r;
43 		}
44 		if (heap[pos].firetime <= heap[m].firetime) {
45 			return;
46 		}
47 		x = heap[pos];
48 		heap[pos] = heap[m];
49 		heap[m] = x;
50 		pos = m;
51 	}
52 }
53 
delay_heap_sort_up(void)54 uint8_t delay_heap_sort_up(void) {
55 	uint32_t pos = heapelements-1;
56 	uint32_t p;
57 	heapelem x;
58 	while (pos>0) {
59 		p = PARENT(pos);
60 		if (heap[pos].firetime >= heap[p].firetime) {
61 			return 0;
62 		}
63 		x = heap[pos];
64 		heap[pos] = heap[p];
65 		heap[p] = x;
66 		pos = p;
67 	}
68 	return 1;
69 }
70 
delay_scheduler(void * arg)71 void* delay_scheduler(void *arg) {
72 	uint64_t now;
73 	struct timespec ts;
74 	struct timeval tv;
75 	void (*fn)(void *);
76 	void *udata;
77 
78 	zassert(pthread_mutex_lock(&dlock));
79 	while (1) {
80 		if (exitflag) {
81 			zassert(pthread_mutex_unlock(&dlock));
82 			return arg;
83 		}
84 		if (heapelements>0) {
85 			now = monotonic_useconds();
86 //			printf("now: %"PRIu64" ; heap[0].firetime: %"PRIu64"\n",now,heap[0].firetime);
87 			if (now < heap[0].firetime) {
88 				gettimeofday(&tv, NULL);
89 				ts.tv_sec = tv.tv_sec + (heap[0].firetime - now) / 1000000;
90 				ts.tv_nsec = (tv.tv_usec + ((heap[0].firetime - now) % 1000000)) * 1000;
91 				while (ts.tv_nsec >= 1000000000) { // "if" should be enough here
92 					ts.tv_sec ++;
93 					ts.tv_nsec -= 1000000000;
94 				}
95 				waiting = 1;
96 				pthread_cond_timedwait(&dcond,&dlock,&ts);
97 				waiting = 0;
98 			} else {
99 				fn = heap[0].fn;
100 				udata = heap[0].udata;
101 				heapelements--;
102 				if (heapelements>0) {
103 					heap[0] = heap[heapelements];
104 					delay_heap_sort_down();
105 				}
106 				zassert(pthread_mutex_unlock(&dlock));
107 				(*fn)(udata);
108 				zassert(pthread_mutex_lock(&dlock));
109 			}
110 		} else {
111 			waiting = 1;
112 			zassert(pthread_cond_wait(&dcond,&dlock));
113 			waiting = 0;
114 		}
115 	}
116 	zassert(pthread_mutex_unlock(&dlock));
117 	return NULL;
118 }
119 
delay_run(void (* fn)(void *),void * udata,uint64_t useconds)120 void delay_run (void (*fn)(void *),void *udata,uint64_t useconds) {
121 	zassert(pthread_mutex_lock(&dlock));
122 	if (heapelements>=heapsize) {
123 		heapsize *= 2;
124 		heap = realloc(heap,sizeof(heapelem)*heapsize);
125 		passert(heap);
126 	}
127 	heap[heapelements].fn = fn;
128 	heap[heapelements].udata = udata;
129 	heap[heapelements].firetime = monotonic_useconds()+useconds;
130 //	printf("fire time: %"PRIu64"\n",heap[heapelements].firetime);
131 	heapelements++;
132 	if (delay_heap_sort_up() && waiting) {
133 		zassert(pthread_cond_signal(&dcond));
134 	}
135 	zassert(pthread_mutex_unlock(&dlock));
136 }
137 
delay_term(void)138 void delay_term(void) {
139 	zassert(pthread_mutex_lock(&dlock));
140 	exitflag = 1;
141 	if (waiting) {
142 		zassert(pthread_cond_signal(&dcond));
143 	}
144 	zassert(pthread_mutex_unlock(&dlock));
145 	zassert(pthread_join(delay_th,NULL));
146 	zassert(pthread_cond_destroy(&dcond));
147 	zassert(pthread_mutex_destroy(&dlock));
148 	free(heap);
149 	heap = NULL;
150 	heapsize = 0;
151 	heapelements = 0;
152 }
153 
delay_init(void)154 void delay_init(void) {
155 	pthread_attr_t thattr;
156 #ifndef WIN32
157 	sigset_t oldset;
158 	sigset_t newset;
159 #endif
160 
161 	exitflag = 0;
162 	waiting = 0;
163 	heap = malloc(sizeof(heapelem)*1024);
164 	passert(heap);
165 	heapelements = 0;
166 	heapsize = 1024;
167 	zassert(pthread_mutex_init(&dlock,NULL));
168 	zassert(pthread_cond_init(&dcond,NULL));
169 
170 	zassert(pthread_attr_init(&thattr));
171 	zassert(pthread_attr_setstacksize(&thattr,0x100000));
172 #ifndef WIN32
173 	sigemptyset(&newset);
174 	sigaddset(&newset, SIGTERM);
175 	sigaddset(&newset, SIGINT);
176 	sigaddset(&newset, SIGHUP);
177 	sigaddset(&newset, SIGQUIT);
178 	zassert(pthread_sigmask(SIG_BLOCK, &newset, &oldset));
179 #endif
180 	zassert(pthread_create(&delay_th,&thattr,delay_scheduler,NULL));
181 #ifndef WIN32
182 	zassert(pthread_sigmask(SIG_SETMASK, &oldset, NULL));
183 #endif
184 	zassert(pthread_attr_destroy(&thattr));
185 }
186