1 #include "declarations.h"
2 #include "worker_cb.h"
3 
4 
5 
worker_cb(void * ta)6 void worker_cb(void *ta) {
7 	struct t_ctx *ti;
8 
9 	ti = (struct t_ctx *) ta;
10 
11 	/* This thread likes to receive pthread_cancel() */
12 	ti->ar = THREAD_TERM_CANCEL;
13 
14 	pthread_cleanup_push(worker_cb_cancel, ti);
15 
16 	if(worker_cb_init(ti) == 0) {
17 		worker_cb_again:
18 
19 		(void) callback_event_run(ti->tp);
20 
21 		if(thread_wait(ti) == 0) goto worker_cb_again;
22 	}
23 
24 	/* Free allocated resources and terminate thread (never reached, but pthread_cleanup_pop() needs to be there) */
25 	pthread_cleanup_pop(1);
26 
27 	(void) worker_cb_finalize(ti);
28 }
29 
worker_cb_cancel(void * ta)30 static void worker_cb_cancel(void *ta) {
31 	struct t_ctx *ti;
32 
33 	ti = (struct t_ctx *) ta;
34 
35 	if(ti->tp != NULL) {
36 		(void) worker_cb_cancel_local(ti, (struct t_str *) ti->tp);
37 
38 		(void) free(ti->tp);
39 
40 		ti->tp = NULL;
41 	}
42 
43 	ti->tr = THREAD_STATE_STOP;
44 }
45 
worker_cb_cancel_local(__UNUSED__ struct t_ctx * ti,__UNUSED__ struct t_str * tp)46 static void worker_cb_cancel_local(__UNUSED__ struct t_ctx *ti, __UNUSED__ struct t_str *tp) {
47 }
48 
worker_cb_finalize(struct t_ctx * ti)49 static void worker_cb_finalize(struct t_ctx *ti) {
50 	(void) thread_exit(ti, NULL);
51 }
52 
worker_cb_init(struct t_ctx * ti)53 static int worker_cb_init(struct t_ctx *ti) {
54 	int r;
55 
56 	int *a;
57 
58 	struct t_str *c;
59 
60 	APP_MALLOC_RET_VALUE(c, sizeof(struct t_str), -1);
61 
62 	ti->tp = c;
63 
64 	(void) thread_title(ti, APPLICATION_EXEC "_cb");
65 
66 	if((a = (int *) conf_fetch("cpu_callback")) != NULL) {
67 		if(*a != 0) (void) thread_affinity(*a);
68 	}
69 
70 	r = worker_cb_init_local(ti, (struct t_str *) ti->tp);
71 
72 	ti->tr = THREAD_STATE_IDLE;
73 
74 	return(r);
75 }
76 
worker_cb_init_local(__UNUSED__ struct t_ctx * ti,struct t_str * tp)77 static int worker_cb_init_local(__UNUSED__ struct t_ctx *ti, struct t_str *tp) {
78 	unsigned int i;
79 
80 	tp->c_lck = IS_NO;
81 	tp->c_pnd = IS_NO;
82 	tp->c_cnt = 0;
83 
84 	for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
85 		tp->cb[0][i].cb = NULL;
86 		tp->cb[1][i].cb = NULL;
87 	}
88 
89 	return(0);
90 }
91 
callback_event_run(struct t_str * tp)92 static void callback_event_run(struct t_str *tp) {
93 	unsigned int i, p;
94 
95 	struct w_par *wp;
96 	struct d_par *dp;
97 
98 	void (*cb)(struct w_par *, struct d_par *);
99 
100 	/* If there are no pending callbacks, leave */
101 	if(tp->c_pnd != IS_YES) return;
102 
103 	tp->c_pnd = IS_NO;
104 
105 	/* Hold on a while if callback pipeline is locked by callback_event_add() */
106 	while(tp->c_lck == IS_YES) {
107 		(void) timer_wait(0, TIMER_RESOLUTION_VERYFAST);
108 	}
109 
110 	tp->c_lck = IS_MAYBE;
111 
112 	/* Make sorted copy of pipeline, skipping unused slots */
113 	p = callback_event_run_op(tp);
114 
115 	tp->c_lck = IS_NO;
116 
117 	for(i = 0; i < p; i++) {
118 		if(tp->cb[1][i].cb != NULL) {
119 			cb = tp->cb[1][i].cb;
120 			wp = tp->cb[1][i].wp;
121 			dp = tp->cb[1][i].dp;
122 
123 			tp->cb[1][i].cb = NULL;
124 
125 			(void) (*cb)(wp, dp);
126 		}
127 	}
128 }
129 
callback_event_run_op(struct t_str * tp)130 static unsigned int callback_event_run_op(struct t_str *tp) {
131 	unsigned int i, p;
132 
133 	p = 0;
134 
135 	for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
136 		if(tp->cb[0][i].cb == NULL) continue;
137 
138 		tp->cb[1][p].n = tp->cb[0][i].n;
139 
140 		tp->cb[1][p].cb = tp->cb[0][i].cb;
141 		tp->cb[1][p].wp = tp->cb[0][i].wp;
142 		tp->cb[1][p].dp = tp->cb[0][i].dp;
143 
144 		tp->cb[0][i].cb = NULL;
145 
146 		++p;
147 	}
148 
149 	if(p > 1) (void) qsort((void *) tp->cb[1], (size_t) p, sizeof(struct t_cb), callback_event_run_cmp);
150 
151 	tp->c_cnt = 0;
152 
153 	return(p);
154 }
155 
callback_event_run_cmp(const void * a,const void * b)156 static int callback_event_run_cmp(const void *a, const void *b) {
157 	const struct t_cb *c, *d;
158 
159 	c = (const struct t_cb *) a;
160 	d = (const struct t_cb *) b;
161 
162 	return(c->n - d->n);
163 }
164 
callback_event_add(struct t_ctx * ti,void * cb,struct w_par * wp,struct d_par * dp)165 void callback_event_add(struct t_ctx *ti, void *cb, struct w_par *wp, struct d_par *dp) {
166 	unsigned int i;
167 
168 	struct t_str *tp;
169 
170 	/* Add callback only if callback function name is specified */
171 	if(dp->s == NULL || dp->s[0] == 0) return;
172 
173 	tp = ti->tp;
174 
175 	/* Hold on a while if callback pipeline is locked by callback_event_run() */
176 	while(tp->c_lck == IS_MAYBE) {
177 		(void) timer_wait(0, TIMER_RESOLUTION_VERYFAST);
178 	}
179 
180 	tp->c_lck = IS_YES;
181 
182 	for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
183 		if(tp->cb[0][i].cb == NULL) {
184 			tp->cb[0][i].n = ++tp->c_cnt;
185 
186 			tp->cb[0][i].cb = cb;
187 			tp->cb[0][i].wp = wp;
188 			tp->cb[0][i].dp = dp;
189 
190 			tp->c_pnd = IS_YES;
191 			tp->c_lck = IS_NO;
192 
193 			return;
194 		}
195 	}
196 
197 	tp->c_lck = IS_NO;
198 
199 	(void) flush_error();
200 
201 	LOGWARN(
202 		ERROR_SLIGHT, SUBSYSTEM,
203 		_("Failed to add callback event to queue because event queue is full")
204 	);
205 }
206