1 /*
2  * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3  * Copyright (c) 2006-2015 Sippy Software, Inc., http://www.sippysoft.com
4  * All rights reserved.
5  *
6  * Redistribution and use in source and binary forms, with or without
7  * modification, are permitted provided that the following conditions
8  * are met:
9  * 1. Redistributions of source code must retain the above copyright
10  *    notice, this list of conditions and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  *
15  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25  * SUCH DAMAGE.
26  *
27  */
28 
29 #include <sys/stat.h>
30 #include <assert.h>
31 #include <stddef.h>
32 #include <stdint.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <poll.h>
36 #include <pthread.h>
37 
38 #include "rtpp_types.h"
39 #include "rtpp_refcnt.h"
40 #include "rtpp_cfg_stable.h"
41 #include "rtpp_sessinfo.h"
42 #include "rtpp_sessinfo_fin.h"
43 #include "rtpp_pipe.h"
44 #include "rtpp_stream.h"
45 #include "rtpp_session.h"
46 #include "rtpp_socket.h"
47 #include "rtpp_mallocs.h"
48 
49 enum polltbl_hst_ops {HST_ADD, HST_DEL, HST_UPD};
50 
51 struct rtpp_polltbl_hst_ent {
52    uint64_t stuid;
53    enum polltbl_hst_ops op;
54    struct rtpp_socket *skt;
55 };
56 
57 struct rtpp_polltbl_hst {
58    int alen;	/* Number of entries allocated */
59    int ulen;	/* Number of entries used */
60    int ilen;	/* Minimum number of entries to be allocated when need to extend */
61    struct rtpp_polltbl_hst_ent *clog;
62    struct rtpp_weakref_obj *streams_wrt;
63 };
64 
65 struct rtpp_sessinfo_priv {
66    struct rtpp_sessinfo pub;
67    pthread_mutex_t lock;
68    struct rtpp_polltbl_hst hst_rtp;
69    struct rtpp_polltbl_hst hst_rtcp;
70 };
71 
72 static int rtpp_sinfo_append(struct rtpp_sessinfo *, struct rtpp_session *,
73   int, struct rtpp_socket **);
74 static void rtpp_sinfo_update(struct rtpp_sessinfo *, struct rtpp_session *,
75   int, struct rtpp_socket **);
76 static void rtpp_sinfo_remove(struct rtpp_sessinfo *, struct rtpp_session *,
77   int);
78 static int rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *, struct rtpp_polltbl *,
79   int);
80 static void rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *);
81 
82 #define PUB2PVT(pubp) \
83   ((struct rtpp_sessinfo_priv *)((char *)(pubp) - offsetof(struct rtpp_sessinfo_priv, pub)))
84 
85 static int
rtpp_polltbl_hst_alloc(struct rtpp_polltbl_hst * hp,int alen)86 rtpp_polltbl_hst_alloc(struct rtpp_polltbl_hst *hp, int alen)
87 {
88 
89     hp->clog = rtpp_zmalloc(sizeof(struct rtpp_polltbl_hst_ent) * alen);
90     if (hp->clog == NULL) {
91         return (-1);
92     }
93     hp->alen = hp->ilen = alen;
94     return (0);
95 }
96 
97 static void
rtpp_polltbl_hst_dtor(struct rtpp_polltbl_hst * hp)98 rtpp_polltbl_hst_dtor(struct rtpp_polltbl_hst *hp)
99 {
100     int i;
101     struct rtpp_polltbl_hst_ent *hep;
102 
103     for (i = 0; i < hp->ulen; i++) {
104         hep = hp->clog + i;
105         if (hep->skt != NULL) {
106             CALL_SMETHOD(hep->skt->rcnt, decref);
107         }
108     }
109     if (hp->alen > 0) {
110         free(hp->clog);
111     }
112 }
113 
114 static int
rtpp_polltbl_hst_extend(struct rtpp_polltbl_hst * hp)115 rtpp_polltbl_hst_extend(struct rtpp_polltbl_hst *hp)
116 {
117     struct rtpp_polltbl_hst_ent *clog_new;
118 
119     clog_new = realloc(hp->clog, sizeof(struct rtpp_polltbl_hst_ent) *
120       (hp->alen + hp->ilen));
121     if (clog_new == NULL) {
122         return (-1);
123     }
124     hp->alen += hp->ilen;
125     hp->clog = clog_new;
126     return (0);
127 }
128 
129 static void
rtpp_polltbl_hst_record(struct rtpp_polltbl_hst * hp,enum polltbl_hst_ops op,uint64_t stuid,struct rtpp_socket * skt)130 rtpp_polltbl_hst_record(struct rtpp_polltbl_hst *hp, enum polltbl_hst_ops op,
131   uint64_t stuid, struct rtpp_socket *skt)
132 {
133     struct rtpp_polltbl_hst_ent *hpe;
134 
135     hpe = hp->clog + hp->ulen;
136     hpe->op = op;
137     hpe->stuid = stuid;
138     hpe->skt = skt;
139     hp->ulen += 1;
140     if (skt != NULL) {
141         CALL_SMETHOD(skt->rcnt, incref);
142     }
143 }
144 
145 struct rtpp_sessinfo *
rtpp_sessinfo_ctor(struct rtpp_cfg_stable * cfsp)146 rtpp_sessinfo_ctor(struct rtpp_cfg_stable *cfsp)
147 {
148     struct rtpp_sessinfo *sessinfo;
149     struct rtpp_sessinfo_priv *pvt;
150     struct rtpp_refcnt *rcnt;
151 
152     pvt = rtpp_rzmalloc(sizeof(struct rtpp_sessinfo_priv), &rcnt);
153     if (pvt == NULL) {
154         return (NULL);
155     }
156     pvt->pub.rcnt = rcnt;
157     sessinfo = &(pvt->pub);
158     if (pthread_mutex_init(&pvt->lock, NULL) != 0) {
159         goto e5;
160     }
161     if (rtpp_polltbl_hst_alloc(&pvt->hst_rtp, 10) != 0) {
162         goto e6;
163     }
164     if (rtpp_polltbl_hst_alloc(&pvt->hst_rtcp, 10) != 0) {
165         goto e7;
166     }
167     pvt->hst_rtp.streams_wrt = cfsp->rtp_streams_wrt;
168     pvt->hst_rtcp.streams_wrt = cfsp->rtcp_streams_wrt;
169 
170     sessinfo->append = &rtpp_sinfo_append;
171     sessinfo->update = &rtpp_sinfo_update;
172     sessinfo->remove = &rtpp_sinfo_remove;
173     sessinfo->sync_polltbl = &rtpp_sinfo_sync_polltbl;
174 
175     CALL_SMETHOD(pvt->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_sessinfo_dtor,
176       pvt);
177     return (sessinfo);
178 
179 e7:
180     free(pvt->hst_rtp.clog);
181 e6:
182     pthread_mutex_destroy(&pvt->lock);
183 e5:
184     CALL_SMETHOD(pvt->pub.rcnt, decref);
185     free(pvt);
186     return (NULL);
187 }
188 
189 static void
rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv * pvt)190 rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *pvt)
191 {
192 
193     rtpp_sessinfo_fin(&(pvt->pub));
194     rtpp_polltbl_hst_dtor(&pvt->hst_rtp);
195     rtpp_polltbl_hst_dtor(&pvt->hst_rtcp);
196     pthread_mutex_destroy(&pvt->lock);
197     free(pvt);
198 }
199 
200 static int
rtpp_sinfo_append(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index,struct rtpp_socket ** new_fds)201 rtpp_sinfo_append(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
202   int index, struct rtpp_socket **new_fds)
203 {
204     struct rtpp_sessinfo_priv *pvt;
205     struct rtpp_stream *rtp, *rtcp;
206 
207     pvt = PUB2PVT(sessinfo);
208     pthread_mutex_lock(&pvt->lock);
209     if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
210         if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
211             return (-1);
212         }
213     }
214     if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
215         if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
216             return (-1);
217         }
218     }
219     rtp = sp->rtp->stream[index];
220     CALL_SMETHOD(rtp, set_skt, new_fds[0]);
221     rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
222     rtcp = sp->rtcp->stream[index];
223     CALL_SMETHOD(rtcp, set_skt, new_fds[1]);
224     rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
225 
226     pthread_mutex_unlock(&pvt->lock);
227     return (0);
228 }
229 
230 static int
find_polltbl_idx(struct rtpp_polltbl * ptp,uint64_t stuid)231 find_polltbl_idx(struct rtpp_polltbl *ptp, uint64_t stuid)
232 {
233     int i;
234 
235     for (i = 0; i < ptp->curlen; i++) {
236         if (ptp->mds[i].stuid != stuid)
237             continue;
238         return (i);
239     }
240     return (-1);
241 }
242 
243 static void
rtpp_sinfo_update(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index,struct rtpp_socket ** new_fds)244 rtpp_sinfo_update(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
245   int index, struct rtpp_socket **new_fds)
246 {
247     struct rtpp_sessinfo_priv *pvt;
248     struct rtpp_stream *rtp, *rtcp;
249     struct rtpp_socket *old_fd;
250 
251     pvt = PUB2PVT(sessinfo);
252 
253     pthread_mutex_lock(&pvt->lock);
254     if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
255         if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
256             return;
257         }
258     }
259     if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
260         if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
261             return;
262         }
263     }
264     rtp = sp->rtp->stream[index];
265     old_fd = CALL_SMETHOD(rtp, update_skt, new_fds[0]);
266     if (old_fd != NULL) {
267         rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_UPD, rtp->stuid, new_fds[0]);
268         CALL_SMETHOD(old_fd->rcnt, decref);
269     } else {
270         rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
271     }
272     rtcp = sp->rtcp->stream[index];
273     old_fd = CALL_SMETHOD(rtcp, update_skt, new_fds[1]);
274     if (old_fd != NULL) {
275         rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_UPD, rtcp->stuid, new_fds[1]);
276         CALL_SMETHOD(old_fd->rcnt, decref);
277     } else {
278         rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
279     }
280 
281     pthread_mutex_unlock(&pvt->lock);
282 }
283 
284 static void
rtpp_sinfo_remove(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index)285 rtpp_sinfo_remove(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
286   int index)
287 {
288     struct rtpp_sessinfo_priv *pvt;
289     struct rtpp_stream *rtp, *rtcp;
290     struct rtpp_socket *fd;
291 
292     pvt = PUB2PVT(sessinfo);
293 
294     pthread_mutex_lock(&pvt->lock);
295     if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
296         if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
297             return;
298         }
299     }
300     if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
301         if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
302             return;
303         }
304     }
305     rtp = sp->rtp->stream[index];
306     fd = CALL_SMETHOD(rtp, get_skt);
307     if (fd != NULL) {
308         rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_DEL, rtp->stuid, NULL);
309         CALL_SMETHOD(fd->rcnt, decref);
310     }
311     rtcp = sp->rtcp->stream[index];
312     fd = CALL_SMETHOD(rtcp, get_skt);
313     if (fd != NULL) {
314         rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_DEL, rtcp->stuid, NULL);
315         CALL_SMETHOD(fd->rcnt, decref);
316     }
317 
318     pthread_mutex_unlock(&pvt->lock);
319 }
320 
321 void
rtpp_polltbl_free(struct rtpp_polltbl * ptbl)322 rtpp_polltbl_free(struct rtpp_polltbl *ptbl)
323 {
324     int i;
325 
326     if (ptbl->aloclen == 0) {
327         return;
328     }
329     if (ptbl->curlen > 0) {
330         for (i = 0; i < ptbl->curlen; i++) {
331             CALL_SMETHOD(ptbl->mds[i].skt->rcnt, decref);
332         }
333     }
334     free(ptbl->pfds);
335     free(ptbl->mds);
336 }
337 
338 static int
rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo * sessinfo,struct rtpp_polltbl * ptbl,int pipe_type)339 rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *sessinfo,
340   struct rtpp_polltbl *ptbl, int pipe_type)
341 {
342     struct rtpp_sessinfo_priv *pvt;
343     struct pollfd *pfds;
344     struct rtpp_polltbl_mdata *mds;
345     struct rtpp_polltbl_hst *hp;
346     int i;
347 
348     pvt = PUB2PVT(sessinfo);
349 
350     pthread_mutex_lock(&pvt->lock);
351     hp = (pipe_type == PIPE_RTP) ? &pvt->hst_rtp : &pvt->hst_rtcp;
352 
353     if (hp->ulen == 0) {
354         pthread_mutex_unlock(&pvt->lock);
355         return (0);
356     }
357 
358     if (hp->ulen > ptbl->aloclen - ptbl->curlen) {
359         int alen = hp->ulen + ptbl->curlen;
360 
361         pfds = realloc(ptbl->pfds, (alen * sizeof(struct pollfd)));
362         mds = realloc(ptbl->mds, (alen * sizeof(struct rtpp_polltbl_mdata)));
363         if (pfds != NULL)
364             ptbl->pfds = pfds;
365         if (mds != NULL)
366             ptbl->mds = mds;
367         if (pfds == NULL || mds == NULL) {
368             pthread_mutex_unlock(&pvt->lock);
369             return (-1);
370         }
371         ptbl->aloclen = alen;
372     }
373 
374     for (i = 0; i < hp->ulen; i++) {
375         struct rtpp_polltbl_hst_ent *hep;
376         int session_index, movelen;
377 
378         hep = hp->clog + i;
379         switch (hep->op) {
380         case HST_ADD:
381 #ifdef RTPP_DEBUG
382             assert(find_polltbl_idx(ptbl, hep->stuid) < 0);
383 #endif
384             session_index = ptbl->curlen;
385             ptbl->pfds[session_index].fd = CALL_METHOD(hep->skt, getfd);
386             ptbl->pfds[session_index].events = POLLIN;
387             ptbl->pfds[session_index].revents = 0;
388             ptbl->mds[session_index].stuid = hep->stuid;
389             ptbl->mds[session_index].skt = hep->skt;
390             ptbl->curlen++;
391             ptbl->revision++;
392             break;
393 
394         case HST_DEL:
395             session_index = find_polltbl_idx(ptbl, hep->stuid);
396             assert(session_index > -1);
397             CALL_SMETHOD(ptbl->mds[session_index].skt->rcnt, decref);
398             movelen = (ptbl->curlen - session_index - 1);
399             if (movelen > 0) {
400                 memmove(&ptbl->pfds[session_index], &ptbl->pfds[session_index + 1],
401                   movelen * sizeof(ptbl->pfds[0]));
402                 memmove(&ptbl->mds[session_index], &ptbl->mds[session_index + 1],
403                   movelen * sizeof(ptbl->mds[0]));
404             }
405             ptbl->curlen--;
406             ptbl->revision++;
407             break;
408 
409         case HST_UPD:
410             session_index = find_polltbl_idx(ptbl, hep->stuid);
411             assert(session_index > -1);
412             CALL_SMETHOD(ptbl->mds[session_index].skt->rcnt, decref);
413             ptbl->pfds[session_index].fd = CALL_METHOD(hep->skt, getfd);
414             ptbl->pfds[session_index].events = POLLIN;
415             ptbl->pfds[session_index].revents = 0;
416             ptbl->mds[session_index].skt = hep->skt;
417             ptbl->revision++;
418             break;
419         }
420     }
421     hp->ulen = 0;
422 
423     ptbl->streams_wrt = hp->streams_wrt;
424     pthread_mutex_unlock(&pvt->lock);
425     return (1);
426 }
427