1 /*
2  * Copyright (c) 2015 Sippy Software, Inc., http://www.sippysoft.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  */
27 
28 #include <pthread.h>
29 #include <signal.h>
30 #include <stddef.h>
31 #include <stdlib.h>
32 #include <string.h>
33 
34 #include "rtpp_types.h"
35 #include "rtpp_mallocs.h"
36 #include "rtpp_refcnt.h"
37 #include "rtpp_queue.h"
38 #include "rtpp_wi.h"
39 #include "rtpp_time.h"
40 #include "rtpp_timed.h"
41 #include "rtpp_timed_fin.h"
42 
43 struct rtpp_timed_cf {
44     struct rtpp_timed pub;
45     struct rtpp_queue *q;
46     struct rtpp_queue *cmd_q;
47     double last_run;
48     double period;
49     pthread_t thread_id;
50     struct rtpp_wi *sigterm;
51     int wi_dsize;
52 };
53 
54 struct rtpp_timed_wi {
55     struct rtpp_timed_task pub;
56     rtpp_timed_cb_t cb_func;
57     rtpp_timed_cancel_cb_t cancel_cb_func;
58     void *cb_func_arg;
59     struct rtpp_refcnt *callback_rcnt;
60     double when;
61     double offset;
62     struct rtpp_timed_cf *timed_cf;
63     struct rtpp_wi *wi;
64     void *rco[0];
65 };
66 
67 #define TASKPUB2PVT(pubp) \
68   ((struct rtpp_timed_wi *)((char *)(pubp) - \
69   offsetof(struct rtpp_timed_wi, pub)))
70 
71 static void rtpp_timed_destroy(struct rtpp_timed_cf *);
72 static int rtpp_timed_schedule(struct rtpp_timed *,
73   double offset, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t, void *);
74 static struct rtpp_timed_task *rtpp_timed_schedule_rc(struct rtpp_timed *,
75   double offset, struct rtpp_refcnt *, rtpp_timed_cb_t, rtpp_timed_cancel_cb_t,
76   void *);
77 static void rtpp_timed_wakeup(struct rtpp_timed *, double);
78 static void rtpp_timed_process(struct rtpp_timed_cf *, double);
79 static int rtpp_timed_cancel(struct rtpp_timed_task *);
80 
81 static void rtpp_timed_task_dtor(struct rtpp_timed_wi *);
82 
83 static void
rtpp_timed_queue_run(void * argp)84 rtpp_timed_queue_run(void *argp)
85 {
86     struct rtpp_timed_cf *rtcp;
87     struct rtpp_wi *wi;
88     struct rtpp_timed_wi *wi_data;
89     int signum;
90     double ctime;
91 
92     rtcp = (struct rtpp_timed_cf *)argp;
93     for (;;) {
94         wi = rtpp_queue_get_item(rtcp->cmd_q, 0);
95         signum = rtpp_wi_sgnl_get_signum(wi);
96         rtpp_wi_free(wi);
97         if (signum == SIGTERM) {
98             break;
99         }
100         ctime = getdtime();
101         rtpp_timed_process(rtcp, ctime);
102     }
103     /* We are terminating, get rid of all requests */
104     while (rtpp_queue_get_length(rtcp->q) > 0) {
105         wi = rtpp_queue_get_item(rtcp->q, 1);
106         wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
107         if (wi_data->cancel_cb_func != NULL) {
108             wi_data->cancel_cb_func(wi_data->cb_func_arg);
109         }
110         if (wi_data->callback_rcnt != NULL) {
111             CALL_SMETHOD(wi_data->callback_rcnt, decref);
112         }
113         CALL_SMETHOD(wi_data->pub.rcnt, decref);
114     }
115 }
116 
117 struct rtpp_timed *
rtpp_timed_ctor(double run_period)118 rtpp_timed_ctor(double run_period)
119 {
120     struct rtpp_timed_cf *rtcp;
121     struct rtpp_refcnt *rcnt;
122 
123     rtcp = rtpp_rzmalloc(sizeof(struct rtpp_timed_cf), &rcnt);
124     if (rtcp == NULL) {
125         goto e0;
126     }
127     rtcp->pub.rcnt = rcnt;
128     rtcp->q = rtpp_queue_init(0, "rtpp_timed(requests)");
129     if (rtcp->q == NULL) {
130         goto e1;
131     }
132     rtcp->cmd_q = rtpp_queue_init(1, "rtpp_timed(commands)");
133     if (rtcp->cmd_q == NULL) {
134         goto e2;
135     }
136     /*
137      * Pre-allocate sigterm, so that we don't have any malloc() in
138      * the destructor.
139      */
140     rtcp->sigterm = rtpp_wi_malloc_sgnl(SIGTERM, NULL, 0);
141     if (rtcp->sigterm == NULL) {
142         goto e3;
143     }
144     if (pthread_create(&rtcp->thread_id, NULL,
145       (void *(*)(void *))&rtpp_timed_queue_run, rtcp) != 0) {
146         goto e5;
147     }
148     rtcp->last_run = getdtime();
149     rtcp->period = run_period;
150     rtcp->wi_dsize = sizeof(struct rtpp_timed_wi) + rtpp_refcnt_osize();
151     rtcp->pub.wakeup = &rtpp_timed_wakeup;
152     rtcp->pub.schedule = &rtpp_timed_schedule;
153     rtcp->pub.schedule_rc = &rtpp_timed_schedule_rc;
154     CALL_SMETHOD(rtcp->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_destroy,
155       rtcp);
156     return (&rtcp->pub);
157 
158 e5:
159     rtpp_wi_free(rtcp->sigterm);
160 e3:
161     rtpp_queue_destroy(rtcp->cmd_q);
162 e2:
163     rtpp_queue_destroy(rtcp->q);
164 e1:
165     CALL_SMETHOD(rtcp->pub.rcnt, decref);
166     free(rtcp);
167 e0:
168     return (NULL);
169 }
170 
171 static void
rtpp_timed_destroy(struct rtpp_timed_cf * rtpp_timed_cf)172 rtpp_timed_destroy(struct rtpp_timed_cf *rtpp_timed_cf)
173 {
174 
175     rtpp_queue_put_item(rtpp_timed_cf->sigterm, rtpp_timed_cf->cmd_q);
176     rtpp_timed_fin(&(rtpp_timed_cf->pub));
177     pthread_join(rtpp_timed_cf->thread_id, NULL);
178     rtpp_queue_destroy(rtpp_timed_cf->cmd_q);
179     rtpp_queue_destroy(rtpp_timed_cf->q);
180     free(rtpp_timed_cf);
181 }
182 
183 static struct rtpp_timed_task *
rtpp_timed_schedule_base(struct rtpp_timed * pub,double offset,struct rtpp_refcnt * callback_rcnt,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg,int support_cancel)184 rtpp_timed_schedule_base(struct rtpp_timed *pub, double offset,
185   struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
186   rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg,
187   int support_cancel)
188 {
189     struct rtpp_wi *wi;
190     struct rtpp_timed_wi *wi_data;
191     struct rtpp_timed_cf *rtpp_timed_cf;
192 
193     rtpp_timed_cf = (struct rtpp_timed_cf *)pub;
194 
195     wi = rtpp_wi_malloc_udata((void **)&wi_data, rtpp_timed_cf->wi_dsize);
196     if (wi == NULL) {
197         return (NULL);
198     }
199     memset(wi_data, '\0', rtpp_timed_cf->wi_dsize);
200     wi_data->wi = wi;
201     wi_data->pub.rcnt = rtpp_refcnt_ctor_pa(&wi_data->rco[0]);
202     if (wi_data->pub.rcnt == NULL) {
203         rtpp_wi_free(wi);
204         return (NULL);
205     }
206     wi_data->cb_func = cb_func;
207     wi_data->cancel_cb_func = cancel_cb_func;
208     wi_data->cb_func_arg = cb_func_arg;
209     wi_data->when = getdtime() + offset;
210     wi_data->offset = offset;
211     wi_data->callback_rcnt = callback_rcnt;
212     if (callback_rcnt != NULL) {
213         CALL_SMETHOD(callback_rcnt, incref);
214     }
215     if (support_cancel != 0) {
216         wi_data->pub.cancel = &rtpp_timed_cancel;
217         wi_data->timed_cf = rtpp_timed_cf;
218         CALL_SMETHOD(pub->rcnt, incref);
219     }
220     CALL_SMETHOD(wi_data->pub.rcnt, incref);
221     rtpp_queue_put_item(wi, rtpp_timed_cf->q);
222     CALL_SMETHOD(wi_data->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_timed_task_dtor,
223       wi_data);
224     return (&(wi_data->pub));
225 }
226 
227 static struct rtpp_timed_task *
rtpp_timed_schedule_rc(struct rtpp_timed * pub,double offset,struct rtpp_refcnt * callback_rcnt,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg)228 rtpp_timed_schedule_rc(struct rtpp_timed *pub, double offset,
229   struct rtpp_refcnt *callback_rcnt, rtpp_timed_cb_t cb_func,
230   rtpp_timed_cancel_cb_t cancel_cb_func, void *cb_func_arg)
231 {
232     struct rtpp_timed_task *tpub;
233 
234     tpub = rtpp_timed_schedule_base(pub, offset, callback_rcnt, cb_func,
235       cancel_cb_func, cb_func_arg, 1);
236     if (tpub == NULL) {
237         return (NULL);
238     }
239     return (tpub);
240 }
241 
242 static int
rtpp_timed_schedule(struct rtpp_timed * pub,double offset,rtpp_timed_cb_t cb_func,rtpp_timed_cancel_cb_t cancel_cb_func,void * cb_func_arg)243 rtpp_timed_schedule(struct rtpp_timed *pub, double offset,
244   rtpp_timed_cb_t cb_func, rtpp_timed_cancel_cb_t cancel_cb_func,
245   void *cb_func_arg)
246 {
247     struct rtpp_timed_task *tpub;
248 
249     tpub = rtpp_timed_schedule_base(pub, offset, NULL, cb_func, cancel_cb_func,
250       cb_func_arg, 0);
251     if (tpub == NULL) {
252         return (-1);
253     }
254     CALL_SMETHOD(tpub->rcnt, decref);
255     return (0);
256 }
257 
258 struct rtpp_timed_istime_arg {
259     double ctime;
260     int wi_dsize;
261 };
262 
263 static int
rtpp_timed_istime(struct rtpp_wi * wi,void * p)264 rtpp_timed_istime(struct rtpp_wi *wi, void *p)
265 {
266     struct rtpp_timed_istime_arg *ap;
267     struct rtpp_timed_wi *wi_data;
268 
269     ap = (struct rtpp_timed_istime_arg *)p;
270     wi_data = rtpp_wi_data_get_ptr(wi, ap->wi_dsize, ap->wi_dsize);
271     if (wi_data->when <= ap->ctime) {
272        return (0);
273     }
274     return (1);
275 }
276 
277 static void
rtpp_timed_wakeup(struct rtpp_timed * pub,double ctime)278 rtpp_timed_wakeup(struct rtpp_timed *pub, double ctime)
279 {
280     struct rtpp_timed_cf *rtcp;
281     struct rtpp_wi *wi;
282 
283     rtcp = (struct rtpp_timed_cf *)pub;
284 
285     if (rtcp->last_run + rtcp->period > ctime)
286         return;
287 
288     wi = rtpp_wi_malloc_sgnl(SIGALRM, NULL, 0);
289     if (wi == NULL) {
290         return;
291     }
292     rtpp_queue_put_item(wi, rtcp->cmd_q);
293     rtcp->last_run = ctime;
294 }
295 
296 static void
rtpp_timed_process(struct rtpp_timed_cf * rtcp,double ctime)297 rtpp_timed_process(struct rtpp_timed_cf *rtcp, double ctime)
298 {
299     struct rtpp_wi *wi;
300     struct rtpp_timed_wi *wi_data;
301     struct rtpp_timed_istime_arg istime_arg;
302     enum rtpp_timed_cb_rvals cb_rval;
303 
304     istime_arg.ctime = ctime;
305     istime_arg.wi_dsize = rtcp->wi_dsize;
306     for (;;) {
307         wi = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_istime,
308           &istime_arg);
309         if (wi == NULL) {
310             return;
311         }
312         wi_data = rtpp_wi_data_get_ptr(wi, rtcp->wi_dsize, rtcp->wi_dsize);
313         cb_rval = wi_data->cb_func(ctime, wi_data->cb_func_arg);
314         if (cb_rval == CB_MORE) {
315             while (wi_data->when <= ctime) {
316                 /* Make sure next run is in the future */
317                 wi_data->when += wi_data->offset;
318             }
319             rtpp_queue_put_item(wi, rtcp->q);
320             continue;
321         }
322         if (wi_data->callback_rcnt != NULL) {
323             CALL_SMETHOD(wi_data->callback_rcnt, decref);
324         }
325         CALL_SMETHOD(wi_data->pub.rcnt, decref);
326     }
327 }
328 
329 struct rtpp_timed_match_wi_arg {
330     int wi_dsize;
331     struct rtpp_timed_wi *wi_data;
332 };
333 
334 static int
rtpp_timed_match_wi(struct rtpp_wi * wia,void * p)335 rtpp_timed_match_wi(struct rtpp_wi *wia, void *p)
336 {
337     struct rtpp_timed_match_wi_arg *ap;
338     struct rtpp_timed_wi *wia_data;
339 
340     ap = (struct rtpp_timed_match_wi_arg *)p;
341     wia_data = rtpp_wi_data_get_ptr(wia, ap->wi_dsize, ap->wi_dsize);
342     if (wia_data == ap->wi_data) {
343         return (0);
344     }
345     return (1);
346 }
347 
348 static void
rtpp_timed_task_dtor(struct rtpp_timed_wi * wi_data)349 rtpp_timed_task_dtor(struct rtpp_timed_wi *wi_data)
350 {
351 
352     rtpp_timed_task_fin(&(wi_data->pub));
353     if (wi_data->timed_cf != NULL) {
354         CALL_SMETHOD(wi_data->timed_cf->pub.rcnt, decref);
355     }
356     rtpp_wi_free(wi_data->wi);
357 }
358 
359 static int
rtpp_timed_cancel(struct rtpp_timed_task * taskpub)360 rtpp_timed_cancel(struct rtpp_timed_task *taskpub)
361 {
362     struct rtpp_wi *wim;
363     struct rtpp_timed_cf *rtcp;
364     struct rtpp_timed_match_wi_arg match_arg;
365     struct rtpp_timed_wi *wi_data;
366 
367     wi_data = TASKPUB2PVT(taskpub);
368 
369     rtcp = wi_data->timed_cf;
370     match_arg.wi_dsize = rtcp->wi_dsize;
371     match_arg.wi_data = wi_data;
372     wim = rtpp_queue_get_first_matching(rtcp->q, rtpp_timed_match_wi,
373       &match_arg);
374     if (wim == NULL) {
375         return (0);
376     }
377     if (wi_data->cancel_cb_func != NULL) {
378         wi_data->cancel_cb_func(wi_data->cb_func_arg);
379     }
380     if (wi_data->callback_rcnt != NULL) {
381         CALL_SMETHOD(wi_data->callback_rcnt, decref);
382     }
383     CALL_SMETHOD(wi_data->pub.rcnt, decref);
384     return (1);
385 }
386