1 /*
2 * Copyright (c) 2015 Sippy Software, Inc., http://www.sippysoft.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <pthread.h>
29 #include <signal.h>
30 #include <stddef.h>
31 #include <stdlib.h>
32 #include <string.h>
33
34 #include "rtpp_types.h"
35 #include "rtpp_mallocs.h"
36 #include "rtpp_refcnt.h"
37 #include "rtpp_queue.h"
38 #include "rtpp_wi.h"
39 #include "rtpp_time.h"
40 #include "rtpp_timed.h"
41 #include "rtpp_timed_fin.h"
42
43 struct rtpp_timed_cf {
44 struct rtpp_timed pub;
45 struct rtpp_queue *q;
46 struct rtpp_queue *cmd_q;
47 double last_run;
48 double period;
49 pthread_t thread_id;
50 struct rtpp_wi *sigterm;
51 int wi_dsize;
52 };
53
54 struct rtpp_timed_wi {
55 struct rtpp_timed_task pub;
56 rtpp_timed_cb_t cb_func;
57 rtpp_timed_cancel_cb_t cancel_cb_func;
58 void *cb_func_arg;
59 struct rtpp_refcnt *callback_rcnt;
60 double when;
61 double offset;
62 struct rtpp_timed_cf *timed_cf;
63 struct rtpp_wi *wi;
64 void *rco[0];
65 };
66
67 #define TASKPUB2PVT(pubp) \
68 ((struct rtpp_timed_wi *)((char *)(pubp) - \
69 offsetof(struct rtpp_timed_wi, pub)))
70
71 static void rtpp_timed_destroy(struct rtpp_timed_cf *);
72 static int rtpp_timed_schedule(struct rtpp_timed *,
73 double offset, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, void *);
74 static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *,
75 double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t,
76 void *);
77 static void rtpp_timed_wakeup(struct rtpp_timed *, double);
78 static void rtpp_timed_process(struct rtpp_timed_cf *, double);
79 static int rtpp_timed_cancel(struct rtpp_timed_task *);
80
81 static void rtpp_timed_task_dtor(struct rtpp_timed_wi *);
82
83 static void
rtpp_timed_queue_run(void * argp)84 rtpp_timed_queue_run(void *argp)
85 {
86 struct rtpp_timed_cf *rtcp;
87 struct rtpp_wi *wi;
88 struct rtpp_timed_wi *wi_data;
89 int signum;
90 double ctime;
91
92 rtcp = (struct rtpp_timed_cf *)argp;
93 for (;;) {
94 wi = rtpp_queue_get_item(rtcp->cmd_q, 0);
95 signum = rtpp_wi_sgnl_get_signum(wi);
96 rtpp_wi_free(wi);
97 if (signum == SIGTERM) {
98 break;
99 }
100 ctime = getdtime();
101 rtpp_timed_process(rtcp, ctime);
102 }
103 /* We are terminating, get rid of all requests */
104 while (rtpp_queue_get_length(rtcp->q) > 0) {
105 wi = rtpp_queue_get_item(rtcp->q, 1);
106 wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
107 if (wi_data->cancel_cb_func != NULL) {
108 wi_data->cancel_cb_func(wi_data->cb_func_arg);
109 }
110 if (wi_data->callback_rcnt != NULL) {
111 CALL_SMETHOD(wi_data->callback_rcnt, decref);
112 }
113 CALL_SMETHOD(wi_data->pub.rcnt, decref);
114 }
115 }
116
117 struct rtpp_timed *
rtpp_timed_ctor(double run_period)118 rtpp_timed_ctor(double run_period)
119 {
120 struct rtpp_timed_cf *rtcp;
121 struct rtpp_refcnt *rcnt;
122
123 rtcp = rtpp_rzmalloc(sizeof(struct rtpp_timed_cf), &rcnt);
124 if (rtcp == NULL) {
125 goto e0;
126 }
127 rtcp->pub.rcnt = rcnt;
128 rtcp->q = rtpp_queue_init(0, "rtpp_timed(requests)");
129 if (rtcp->q == NULL) {
130 goto e1;
131 }
132 rtcp->cmd_q = rtpp_queue_init(1, "rtpp_timed(commands)");
133 if (rtcp->cmd_q == NULL) {
134 goto e2;
135 }
136 /*
137 * Pre-allocate sigterm, so that we don't have any malloc() in
138 * the destructor.
139 */
140 rtcp->sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
141 if (rtcp->sigterm == NULL) {
142 goto e3;
143 }
144 if (pthread_create(&rtcp->thread_id, NULL,
145 (void *(*)(void *))&rtpp_timed_queue_run, rtcp) != 0) {
146 goto e5;
147 }
148 rtcp->last_run = getdtime();
149 rtcp->period = run_period;
150 rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize();
151 rtcp->pub.wakeup = &rtpp_timed_wakeup;
152 rtcp->pub.schedule = &rtpp_timed_schedule;
153 rtcp->pub.schedule_rc = &rtpp_timed_schedule_rc;
154 CALL_SMETHOD(rtcp->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_destroy,
155 rtcp);
156 return (&rtcp->pub);
157
158 e5:
159 rtpp_wi_free(rtcp->sigterm);
160 e3:
161 rtpp_queue_destroy(rtcp->cmd_q);
162 e2:
163 rtpp_queue_destroy(rtcp->q);
164 e1:
165 CALL_SMETHOD(rtcp->pub.rcnt, decref);
166 free(rtcp);
167 e0:
168 return (NULL);
169 }
170
171 static void
rtpp_timed_destroy(struct rtpp_timed_cf * rtpp_timed_cf)172 rtpp_timed_destroy(struct rtpp_timed_cf *rtpp_timed_cf)
173 {
174
175 rtpp_queue_put_item(rtpp_timed_cf->sigterm, rtpp_timed_cf->cmd_q);
176 rtpp_timed_fin(&(rtpp_timed_cf->pub));
177 pthread_join(rtpp_timed_cf->thread_id, NULL);
178 rtpp_queue_destroy(rtpp_timed_cf->cmd_q);
179 rtpp_queue_destroy(rtpp_timed_cf->q);
180 free(rtpp_timed_cf);
181 }
182
183 static struct rtpp_timed_task *
rtpp_timed_schedule_base(struct rtpp_timed * pub,double offset,struct rtpp_refcnt * callback_rcnt,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg,int support_cancel)184 rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset,
185 struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
186 rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg,
187 int support_cancel)
188 {
189 struct rtpp_wi *wi;
190 struct rtpp_timed_wi *wi_data;
191 struct rtpp_timed_cf *rtpp_timed_cf;
192
193 rtpp_timed_cf = (struct rtpp_timed_cf *)pub;
194
195 wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize);
196 if (wi == NULL) {
197 return (NULL);
198 }
199 memset(wi_data, '\0', rtpp_timed_cf->wi_dsize);
200 wi_data->wi = wi;
201 wi_data->pub.rcnt = rtpp_refcnt_ctor_pa(&wi_data->rco[0]);
202 if (wi_data->pub.rcnt == NULL) {
203 rtpp_wi_free(wi);
204 return (NULL);
205 }
206 wi_data->cb_func = cb_func;
207 wi_data->cancel_cb_func = cancel_cb_func;
208 wi_data->cb_func_arg = cb_func_arg;
209 wi_data->when = getdtime() + offset;
210 wi_data->offset = offset;
211 wi_data->callback_rcnt = callback_rcnt;
212 if (callback_rcnt != NULL) {
213 CALL_SMETHOD(callback_rcnt, incref);
214 }
215 if (support_cancel != 0) {
216 wi_data->pub.cancel = &rtpp_timed_cancel;
217 wi_data->timed_cf = rtpp_timed_cf;
218 CALL_SMETHOD(pub->rcnt, incref);
219 }
220 CALL_SMETHOD(wi_data->pub.rcnt, incref);
221 rtpp_queue_put_item(wi, rtpp_timed_cf->q);
222 CALL_SMETHOD(wi_data->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor,
223 wi_data);
224 return (&(wi_data->pub));
225 }
226
227 static struct rtpp_timed_task *
rtpp_timed_schedule_rc(struct rtpp_timed * pub,double offset,struct rtpp_refcnt * callback_rcnt,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg)228 rtpp_timed_schedule_rc(struct rtpp_timed *pub, double offset,
229 struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
230 rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg)
231 {
232 struct rtpp_timed_task *tpub;
233
234 tpub = rtpp_timed_schedule_base(pub, offset, callback_rcnt, cb_func,
235 cancel_cb_func, cb_func_arg, 1);
236 if (tpub == NULL) {
237 return (NULL);
238 }
239 return (tpub);
240 }
241
242 static int
rtpp_timed_schedule(struct rtpp_timed * pub,double offset,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg)243 rtpp_timed_schedule(struct rtpp_timed *pub, double offset,
244 rtpp_timed_cb_t cb_func, rtpp_timed_cancel_cb_t cancel_cb_func,
245 void *cb_func_arg)
246 {
247 struct rtpp_timed_task *tpub;
248
249 tpub = rtpp_timed_schedule_base(pub, offset, NULL, cb_func, cancel_cb_func,
250 cb_func_arg, 0);
251 if (tpub == NULL) {
252 return (-1);
253 }
254 CALL_SMETHOD(tpub->rcnt, decref);
255 return (0);
256 }
257
258 struct rtpp_timed_istime_arg {
259 double ctime;
260 int wi_dsize;
261 };
262
263 static int
rtpp_timed_istime(struct rtpp_wi * wi,void * p)264 rtpp_timed_istime(struct rtpp_wi *wi, void *p)
265 {
266 struct rtpp_timed_istime_arg *ap;
267 struct rtpp_timed_wi *wi_data;
268
269 ap = (struct rtpp_timed_istime_arg *)p;
270 wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize);
271 if (wi_data->when <= ap->ctime) {
272 return (0);
273 }
274 return (1);
275 }
276
277 static void
rtpp_timed_wakeup(struct rtpp_timed * pub,double ctime)278 rtpp_timed_wakeup(struct rtpp_timed *pub, double ctime)
279 {
280 struct rtpp_timed_cf *rtcp;
281 struct rtpp_wi *wi;
282
283 rtcp = (struct rtpp_timed_cf *)pub;
284
285 if (rtcp->last_run + rtcp->period > ctime)
286 return;
287
288 wi = rtpp_wi_malloc_sgnl(SIGALRM, NULL, 0);
289 if (wi == NULL) {
290 return;
291 }
292 rtpp_queue_put_item(wi, rtcp->cmd_q);
293 rtcp->last_run = ctime;
294 }
295
296 static void
rtpp_timed_process(struct rtpp_timed_cf * rtcp,double ctime)297 rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime)
298 {
299 struct rtpp_wi *wi;
300 struct rtpp_timed_wi *wi_data;
301 struct rtpp_timed_istime_arg istime_arg;
302 enum rtpp_timed_cb_rvals cb_rval;
303
304 istime_arg.ctime = ctime;
305 istime_arg.wi_dsize = rtcp->wi_dsize;
306 for (;;) {
307 wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime,
308 &istime_arg);
309 if (wi == NULL) {
310 return;
311 }
312 wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
313 cb_rval = wi_data->cb_func(ctime, wi_data->cb_func_arg);
314 if (cb_rval == CB_MORE) {
315 while (wi_data->when <= ctime) {
316 /* Make sure next run is in the future */
317 wi_data->when += wi_data->offset;
318 }
319 rtpp_queue_put_item(wi, rtcp->q);
320 continue;
321 }
322 if (wi_data->callback_rcnt != NULL) {
323 CALL_SMETHOD(wi_data->callback_rcnt, decref);
324 }
325 CALL_SMETHOD(wi_data->pub.rcnt, decref);
326 }
327 }
328
329 struct rtpp_timed_match_wi_arg {
330 int wi_dsize;
331 struct rtpp_timed_wi *wi_data;
332 };
333
334 static int
rtpp_timed_match_wi(struct rtpp_wi * wia,void * p)335 rtpp_timed_match_wi(struct rtpp_wi *wia, void *p)
336 {
337 struct rtpp_timed_match_wi_arg *ap;
338 struct rtpp_timed_wi *wia_data;
339
340 ap = (struct rtpp_timed_match_wi_arg *)p;
341 wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize);
342 if (wia_data == ap->wi_data) {
343 return (0);
344 }
345 return (1);
346 }
347
348 static void
rtpp_timed_task_dtor(struct rtpp_timed_wi * wi_data)349 rtpp_timed_task_dtor(struct rtpp_timed_wi *wi_data)
350 {
351
352 rtpp_timed_task_fin(&(wi_data->pub));
353 if (wi_data->timed_cf != NULL) {
354 CALL_SMETHOD(wi_data->timed_cf->pub.rcnt, decref);
355 }
356 rtpp_wi_free(wi_data->wi);
357 }
358
359 static int
rtpp_timed_cancel(struct rtpp_timed_task * taskpub)360 rtpp_timed_cancel(struct rtpp_timed_task *taskpub)
361 {
362 struct rtpp_wi *wim;
363 struct rtpp_timed_cf *rtcp;
364 struct rtpp_timed_match_wi_arg match_arg;
365 struct rtpp_timed_wi *wi_data;
366
367 wi_data = TASKPUB2PVT(taskpub);
368
369 rtcp = wi_data->timed_cf;
370 match_arg.wi_dsize = rtcp->wi_dsize;
371 match_arg.wi_data = wi_data;
372 wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi,
373 &match_arg);
374 if (wim == NULL) {
375 return (0);
376 }
377 if (wi_data->cancel_cb_func != NULL) {
378 wi_data->cancel_cb_func(wi_data->cb_func_arg);
379 }
380 if (wi_data->callback_rcnt != NULL) {
381 CALL_SMETHOD(wi_data->callback_rcnt, decref);
382 }
383 CALL_SMETHOD(wi_data->pub.rcnt, decref);
384 return (1);
385 }
386