1 #include "declarations.h"
2 #include "worker_cb.h"
3
4
5
worker_cb(void * ta)6 void worker_cb(void *ta) {
7 struct t_ctx *ti;
8
9 ti = (struct t_ctx *) ta;
10
11 /* This thread likes to receive pthread_cancel() */
12 ti->ar = THREAD_TERM_CANCEL;
13
14 pthread_cleanup_push(worker_cb_cancel, ti);
15
16 if(worker_cb_init(ti) == 0) {
17 worker_cb_again:
18
19 (void) callback_event_run(ti->tp);
20
21 if(thread_wait(ti) == 0) goto worker_cb_again;
22 }
23
24 /* Free allocated resources and terminate thread (never reached, but pthread_cleanup_pop() needs to be there) */
25 pthread_cleanup_pop(1);
26
27 (void) worker_cb_finalize(ti);
28 }
29
worker_cb_cancel(void * ta)30 static void worker_cb_cancel(void *ta) {
31 struct t_ctx *ti;
32
33 ti = (struct t_ctx *) ta;
34
35 if(ti->tp != NULL) {
36 (void) worker_cb_cancel_local(ti, (struct t_str *) ti->tp);
37
38 (void) free(ti->tp);
39
40 ti->tp = NULL;
41 }
42
43 ti->tr = THREAD_STATE_STOP;
44 }
45
worker_cb_cancel_local(__UNUSED__ struct t_ctx * ti,__UNUSED__ struct t_str * tp)46 static void worker_cb_cancel_local(__UNUSED__ struct t_ctx *ti, __UNUSED__ struct t_str *tp) {
47 }
48
worker_cb_finalize(struct t_ctx * ti)49 static void worker_cb_finalize(struct t_ctx *ti) {
50 (void) thread_exit(ti, NULL);
51 }
52
worker_cb_init(struct t_ctx * ti)53 static int worker_cb_init(struct t_ctx *ti) {
54 int r;
55
56 int *a;
57
58 struct t_str *c;
59
60 APP_MALLOC_RET_VALUE(c, sizeof(struct t_str), -1);
61
62 ti->tp = c;
63
64 (void) thread_title(ti, APPLICATION_EXEC "_cb");
65
66 if((a = (int *) conf_fetch("cpu_callback")) != NULL) {
67 if(*a != 0) (void) thread_affinity(*a);
68 }
69
70 r = worker_cb_init_local(ti, (struct t_str *) ti->tp);
71
72 ti->tr = THREAD_STATE_IDLE;
73
74 return(r);
75 }
76
worker_cb_init_local(__UNUSED__ struct t_ctx * ti,struct t_str * tp)77 static int worker_cb_init_local(__UNUSED__ struct t_ctx *ti, struct t_str *tp) {
78 unsigned int i;
79
80 tp->c_lck = IS_NO;
81 tp->c_pnd = IS_NO;
82 tp->c_cnt = 0;
83
84 for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
85 tp->cb[0][i].cb = NULL;
86 tp->cb[1][i].cb = NULL;
87 }
88
89 return(0);
90 }
91
callback_event_run(struct t_str * tp)92 static void callback_event_run(struct t_str *tp) {
93 unsigned int i, p;
94
95 struct w_par *wp;
96 struct d_par *dp;
97
98 void (*cb)(struct w_par *, struct d_par *);
99
100 /* If there are no pending callbacks, leave */
101 if(tp->c_pnd != IS_YES) return;
102
103 tp->c_pnd = IS_NO;
104
105 /* Hold on a while if callback pipeline is locked by callback_event_add() */
106 while(tp->c_lck == IS_YES) {
107 (void) timer_wait(0, TIMER_RESOLUTION_VERYFAST);
108 }
109
110 tp->c_lck = IS_MAYBE;
111
112 /* Make sorted copy of pipeline, skipping unused slots */
113 p = callback_event_run_op(tp);
114
115 tp->c_lck = IS_NO;
116
117 for(i = 0; i < p; i++) {
118 if(tp->cb[1][i].cb != NULL) {
119 cb = tp->cb[1][i].cb;
120 wp = tp->cb[1][i].wp;
121 dp = tp->cb[1][i].dp;
122
123 tp->cb[1][i].cb = NULL;
124
125 (void) (*cb)(wp, dp);
126 }
127 }
128 }
129
callback_event_run_op(struct t_str * tp)130 static unsigned int callback_event_run_op(struct t_str *tp) {
131 unsigned int i, p;
132
133 p = 0;
134
135 for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
136 if(tp->cb[0][i].cb == NULL) continue;
137
138 tp->cb[1][p].n = tp->cb[0][i].n;
139
140 tp->cb[1][p].cb = tp->cb[0][i].cb;
141 tp->cb[1][p].wp = tp->cb[0][i].wp;
142 tp->cb[1][p].dp = tp->cb[0][i].dp;
143
144 tp->cb[0][i].cb = NULL;
145
146 ++p;
147 }
148
149 if(p > 1) (void) qsort((void *) tp->cb[1], (size_t) p, sizeof(struct t_cb), callback_event_run_cmp);
150
151 tp->c_cnt = 0;
152
153 return(p);
154 }
155
callback_event_run_cmp(const void * a,const void * b)156 static int callback_event_run_cmp(const void *a, const void *b) {
157 const struct t_cb *c, *d;
158
159 c = (const struct t_cb *) a;
160 d = (const struct t_cb *) b;
161
162 return(c->n - d->n);
163 }
164
callback_event_add(struct t_ctx * ti,void * cb,struct w_par * wp,struct d_par * dp)165 void callback_event_add(struct t_ctx *ti, void *cb, struct w_par *wp, struct d_par *dp) {
166 unsigned int i;
167
168 struct t_str *tp;
169
170 /* Add callback only if callback function name is specified */
171 if(dp->s == NULL || dp->s[0] == 0) return;
172
173 tp = ti->tp;
174
175 /* Hold on a while if callback pipeline is locked by callback_event_run() */
176 while(tp->c_lck == IS_MAYBE) {
177 (void) timer_wait(0, TIMER_RESOLUTION_VERYFAST);
178 }
179
180 tp->c_lck = IS_YES;
181
182 for(i = 0; i < WORKER_CB_CALLBACK_MAX; i++) {
183 if(tp->cb[0][i].cb == NULL) {
184 tp->cb[0][i].n = ++tp->c_cnt;
185
186 tp->cb[0][i].cb = cb;
187 tp->cb[0][i].wp = wp;
188 tp->cb[0][i].dp = dp;
189
190 tp->c_pnd = IS_YES;
191 tp->c_lck = IS_NO;
192
193 return;
194 }
195 }
196
197 tp->c_lck = IS_NO;
198
199 (void) flush_error();
200
201 LOGWARN(
202 ERROR_SLIGHT, SUBSYSTEM,
203 _("Failed to add callback event to queue because event queue is full")
204 );
205 }
206