1 /*
2 * Copyright (c) 2004-2006 Maxim Sobolev <sobomax@FreeBSD.org>
3 * Copyright (c) 2006-2015 Sippy Software, Inc., http://www.sippysoft.com
4 * All rights reserved.
5 *
6 * Redistribution and use in source and binary forms, with or without
7 * modification, are permitted provided that the following conditions
8 * are met:
9 * 1. Redistributions of source code must retain the above copyright
10 * notice, this list of conditions and the following disclaimer.
11 * 2. Redistributions in binary form must reproduce the above copyright
12 * notice, this list of conditions and the following disclaimer in the
13 * documentation and/or other materials provided with the distribution.
14 *
15 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
16 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
17 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
18 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
19 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
20 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
21 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
22 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
23 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
24 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
25 * SUCH DAMAGE.
26 *
27 */
28
29 #include <sys/stat.h>
30 #include <assert.h>
31 #include <stddef.h>
32 #include <stdint.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <poll.h>
36 #include <pthread.h>
37
38 #include "rtpp_types.h"
39 #include "rtpp_refcnt.h"
40 #include "rtpp_cfg_stable.h"
41 #include "rtpp_sessinfo.h"
42 #include "rtpp_sessinfo_fin.h"
43 #include "rtpp_pipe.h"
44 #include "rtpp_stream.h"
45 #include "rtpp_session.h"
46 #include "rtpp_socket.h"
47 #include "rtpp_mallocs.h"
48
49 enum polltbl_hst_ops {HST_ADD, HST_DEL, HST_UPD};
50
51 struct rtpp_polltbl_hst_ent {
52 uint64_t stuid;
53 enum polltbl_hst_ops op;
54 struct rtpp_socket *skt;
55 };
56
57 struct rtpp_polltbl_hst {
58 int alen; /* Number of entries allocated */
59 int ulen; /* Number of entries used */
60 int ilen; /* Minimum number of entries to be allocated when need to extend */
61 struct rtpp_polltbl_hst_ent *clog;
62 struct rtpp_weakref_obj *streams_wrt;
63 };
64
65 struct rtpp_sessinfo_priv {
66 struct rtpp_sessinfo pub;
67 pthread_mutex_t lock;
68 struct rtpp_polltbl_hst hst_rtp;
69 struct rtpp_polltbl_hst hst_rtcp;
70 };
71
72 static int rtpp_sinfo_append(struct rtpp_sessinfo *, struct rtpp_session *,
73 int, struct rtpp_socket **);
74 static void rtpp_sinfo_update(struct rtpp_sessinfo *, struct rtpp_session *,
75 int, struct rtpp_socket **);
76 static void rtpp_sinfo_remove(struct rtpp_sessinfo *, struct rtpp_session *,
77 int);
78 static int rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *, struct rtpp_polltbl *,
79 int);
80 static void rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *);
81
82 #define PUB2PVT(pubp) \
83 ((struct rtpp_sessinfo_priv *)((char *)(pubp) - offsetof(struct rtpp_sessinfo_priv, pub)))
84
85 static int
rtpp_polltbl_hst_alloc(struct rtpp_polltbl_hst * hp,int alen)86 rtpp_polltbl_hst_alloc(struct rtpp_polltbl_hst *hp, int alen)
87 {
88
89 hp->clog = rtpp_zmalloc(sizeof(struct rtpp_polltbl_hst_ent) * alen);
90 if (hp->clog == NULL) {
91 return (-1);
92 }
93 hp->alen = hp->ilen = alen;
94 return (0);
95 }
96
97 static void
rtpp_polltbl_hst_dtor(struct rtpp_polltbl_hst * hp)98 rtpp_polltbl_hst_dtor(struct rtpp_polltbl_hst *hp)
99 {
100 int i;
101 struct rtpp_polltbl_hst_ent *hep;
102
103 for (i = 0; i < hp->ulen; i++) {
104 hep = hp->clog + i;
105 if (hep->skt != NULL) {
106 CALL_SMETHOD(hep->skt->rcnt, decref);
107 }
108 }
109 if (hp->alen > 0) {
110 free(hp->clog);
111 }
112 }
113
114 static int
rtpp_polltbl_hst_extend(struct rtpp_polltbl_hst * hp)115 rtpp_polltbl_hst_extend(struct rtpp_polltbl_hst *hp)
116 {
117 struct rtpp_polltbl_hst_ent *clog_new;
118
119 clog_new = realloc(hp->clog, sizeof(struct rtpp_polltbl_hst_ent) *
120 (hp->alen + hp->ilen));
121 if (clog_new == NULL) {
122 return (-1);
123 }
124 hp->alen += hp->ilen;
125 hp->clog = clog_new;
126 return (0);
127 }
128
129 static void
rtpp_polltbl_hst_record(struct rtpp_polltbl_hst * hp,enum polltbl_hst_ops op,uint64_t stuid,struct rtpp_socket * skt)130 rtpp_polltbl_hst_record(struct rtpp_polltbl_hst *hp, enum polltbl_hst_ops op,
131 uint64_t stuid, struct rtpp_socket *skt)
132 {
133 struct rtpp_polltbl_hst_ent *hpe;
134
135 hpe = hp->clog + hp->ulen;
136 hpe->op = op;
137 hpe->stuid = stuid;
138 hpe->skt = skt;
139 hp->ulen += 1;
140 if (skt != NULL) {
141 CALL_SMETHOD(skt->rcnt, incref);
142 }
143 }
144
145 struct rtpp_sessinfo *
rtpp_sessinfo_ctor(struct rtpp_cfg_stable * cfsp)146 rtpp_sessinfo_ctor(struct rtpp_cfg_stable *cfsp)
147 {
148 struct rtpp_sessinfo *sessinfo;
149 struct rtpp_sessinfo_priv *pvt;
150 struct rtpp_refcnt *rcnt;
151
152 pvt = rtpp_rzmalloc(sizeof(struct rtpp_sessinfo_priv), &rcnt);
153 if (pvt == NULL) {
154 return (NULL);
155 }
156 pvt->pub.rcnt = rcnt;
157 sessinfo = &(pvt->pub);
158 if (pthread_mutex_init(&pvt->lock, NULL) != 0) {
159 goto e5;
160 }
161 if (rtpp_polltbl_hst_alloc(&pvt->hst_rtp, 10) != 0) {
162 goto e6;
163 }
164 if (rtpp_polltbl_hst_alloc(&pvt->hst_rtcp, 10) != 0) {
165 goto e7;
166 }
167 pvt->hst_rtp.streams_wrt = cfsp->rtp_streams_wrt;
168 pvt->hst_rtcp.streams_wrt = cfsp->rtcp_streams_wrt;
169
170 sessinfo->append = &rtpp_sinfo_append;
171 sessinfo->update = &rtpp_sinfo_update;
172 sessinfo->remove = &rtpp_sinfo_remove;
173 sessinfo->sync_polltbl = &rtpp_sinfo_sync_polltbl;
174
175 CALL_SMETHOD(pvt->pub.rcnt, attach, (rtpp_refcnt_dtor_t)&rtpp_sessinfo_dtor,
176 pvt);
177 return (sessinfo);
178
179 e7:
180 free(pvt->hst_rtp.clog);
181 e6:
182 pthread_mutex_destroy(&pvt->lock);
183 e5:
184 CALL_SMETHOD(pvt->pub.rcnt, decref);
185 free(pvt);
186 return (NULL);
187 }
188
189 static void
rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv * pvt)190 rtpp_sessinfo_dtor(struct rtpp_sessinfo_priv *pvt)
191 {
192
193 rtpp_sessinfo_fin(&(pvt->pub));
194 rtpp_polltbl_hst_dtor(&pvt->hst_rtp);
195 rtpp_polltbl_hst_dtor(&pvt->hst_rtcp);
196 pthread_mutex_destroy(&pvt->lock);
197 free(pvt);
198 }
199
200 static int
rtpp_sinfo_append(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index,struct rtpp_socket ** new_fds)201 rtpp_sinfo_append(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
202 int index, struct rtpp_socket **new_fds)
203 {
204 struct rtpp_sessinfo_priv *pvt;
205 struct rtpp_stream *rtp, *rtcp;
206
207 pvt = PUB2PVT(sessinfo);
208 pthread_mutex_lock(&pvt->lock);
209 if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
210 if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
211 return (-1);
212 }
213 }
214 if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
215 if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
216 return (-1);
217 }
218 }
219 rtp = sp->rtp->stream[index];
220 CALL_SMETHOD(rtp, set_skt, new_fds[0]);
221 rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
222 rtcp = sp->rtcp->stream[index];
223 CALL_SMETHOD(rtcp, set_skt, new_fds[1]);
224 rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
225
226 pthread_mutex_unlock(&pvt->lock);
227 return (0);
228 }
229
230 static int
find_polltbl_idx(struct rtpp_polltbl * ptp,uint64_t stuid)231 find_polltbl_idx(struct rtpp_polltbl *ptp, uint64_t stuid)
232 {
233 int i;
234
235 for (i = 0; i < ptp->curlen; i++) {
236 if (ptp->mds[i].stuid != stuid)
237 continue;
238 return (i);
239 }
240 return (-1);
241 }
242
243 static void
rtpp_sinfo_update(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index,struct rtpp_socket ** new_fds)244 rtpp_sinfo_update(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
245 int index, struct rtpp_socket **new_fds)
246 {
247 struct rtpp_sessinfo_priv *pvt;
248 struct rtpp_stream *rtp, *rtcp;
249 struct rtpp_socket *old_fd;
250
251 pvt = PUB2PVT(sessinfo);
252
253 pthread_mutex_lock(&pvt->lock);
254 if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
255 if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
256 return;
257 }
258 }
259 if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
260 if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
261 return;
262 }
263 }
264 rtp = sp->rtp->stream[index];
265 old_fd = CALL_SMETHOD(rtp, update_skt, new_fds[0]);
266 if (old_fd != NULL) {
267 rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_UPD, rtp->stuid, new_fds[0]);
268 CALL_SMETHOD(old_fd->rcnt, decref);
269 } else {
270 rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_ADD, rtp->stuid, new_fds[0]);
271 }
272 rtcp = sp->rtcp->stream[index];
273 old_fd = CALL_SMETHOD(rtcp, update_skt, new_fds[1]);
274 if (old_fd != NULL) {
275 rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_UPD, rtcp->stuid, new_fds[1]);
276 CALL_SMETHOD(old_fd->rcnt, decref);
277 } else {
278 rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_ADD, rtcp->stuid, new_fds[1]);
279 }
280
281 pthread_mutex_unlock(&pvt->lock);
282 }
283
284 static void
rtpp_sinfo_remove(struct rtpp_sessinfo * sessinfo,struct rtpp_session * sp,int index)285 rtpp_sinfo_remove(struct rtpp_sessinfo *sessinfo, struct rtpp_session *sp,
286 int index)
287 {
288 struct rtpp_sessinfo_priv *pvt;
289 struct rtpp_stream *rtp, *rtcp;
290 struct rtpp_socket *fd;
291
292 pvt = PUB2PVT(sessinfo);
293
294 pthread_mutex_lock(&pvt->lock);
295 if (pvt->hst_rtp.ulen == pvt->hst_rtp.alen) {
296 if (rtpp_polltbl_hst_extend(&pvt->hst_rtp) < 0) {
297 return;
298 }
299 }
300 if (pvt->hst_rtcp.ulen == pvt->hst_rtcp.alen) {
301 if (rtpp_polltbl_hst_extend(&pvt->hst_rtcp) < 0) {
302 return;
303 }
304 }
305 rtp = sp->rtp->stream[index];
306 fd = CALL_SMETHOD(rtp, get_skt);
307 if (fd != NULL) {
308 rtpp_polltbl_hst_record(&pvt->hst_rtp, HST_DEL, rtp->stuid, NULL);
309 CALL_SMETHOD(fd->rcnt, decref);
310 }
311 rtcp = sp->rtcp->stream[index];
312 fd = CALL_SMETHOD(rtcp, get_skt);
313 if (fd != NULL) {
314 rtpp_polltbl_hst_record(&pvt->hst_rtcp, HST_DEL, rtcp->stuid, NULL);
315 CALL_SMETHOD(fd->rcnt, decref);
316 }
317
318 pthread_mutex_unlock(&pvt->lock);
319 }
320
321 void
rtpp_polltbl_free(struct rtpp_polltbl * ptbl)322 rtpp_polltbl_free(struct rtpp_polltbl *ptbl)
323 {
324 int i;
325
326 if (ptbl->aloclen == 0) {
327 return;
328 }
329 if (ptbl->curlen > 0) {
330 for (i = 0; i < ptbl->curlen; i++) {
331 CALL_SMETHOD(ptbl->mds[i].skt->rcnt, decref);
332 }
333 }
334 free(ptbl->pfds);
335 free(ptbl->mds);
336 }
337
338 static int
rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo * sessinfo,struct rtpp_polltbl * ptbl,int pipe_type)339 rtpp_sinfo_sync_polltbl(struct rtpp_sessinfo *sessinfo,
340 struct rtpp_polltbl *ptbl, int pipe_type)
341 {
342 struct rtpp_sessinfo_priv *pvt;
343 struct pollfd *pfds;
344 struct rtpp_polltbl_mdata *mds;
345 struct rtpp_polltbl_hst *hp;
346 int i;
347
348 pvt = PUB2PVT(sessinfo);
349
350 pthread_mutex_lock(&pvt->lock);
351 hp = (pipe_type == PIPE_RTP) ? &pvt->hst_rtp : &pvt->hst_rtcp;
352
353 if (hp->ulen == 0) {
354 pthread_mutex_unlock(&pvt->lock);
355 return (0);
356 }
357
358 if (hp->ulen > ptbl->aloclen - ptbl->curlen) {
359 int alen = hp->ulen + ptbl->curlen;
360
361 pfds = realloc(ptbl->pfds, (alen * sizeof(struct pollfd)));
362 mds = realloc(ptbl->mds, (alen * sizeof(struct rtpp_polltbl_mdata)));
363 if (pfds != NULL)
364 ptbl->pfds = pfds;
365 if (mds != NULL)
366 ptbl->mds = mds;
367 if (pfds == NULL || mds == NULL) {
368 pthread_mutex_unlock(&pvt->lock);
369 return (-1);
370 }
371 ptbl->aloclen = alen;
372 }
373
374 for (i = 0; i < hp->ulen; i++) {
375 struct rtpp_polltbl_hst_ent *hep;
376 int session_index, movelen;
377
378 hep = hp->clog + i;
379 switch (hep->op) {
380 case HST_ADD:
381 #ifdef RTPP_DEBUG
382 assert(find_polltbl_idx(ptbl, hep->stuid) < 0);
383 #endif
384 session_index = ptbl->curlen;
385 ptbl->pfds[session_index].fd = CALL_METHOD(hep->skt, getfd);
386 ptbl->pfds[session_index].events = POLLIN;
387 ptbl->pfds[session_index].revents = 0;
388 ptbl->mds[session_index].stuid = hep->stuid;
389 ptbl->mds[session_index].skt = hep->skt;
390 ptbl->curlen++;
391 ptbl->revision++;
392 break;
393
394 case HST_DEL:
395 session_index = find_polltbl_idx(ptbl, hep->stuid);
396 assert(session_index > -1);
397 CALL_SMETHOD(ptbl->mds[session_index].skt->rcnt, decref);
398 movelen = (ptbl->curlen - session_index - 1);
399 if (movelen > 0) {
400 memmove(&ptbl->pfds[session_index], &ptbl->pfds[session_index + 1],
401 movelen * sizeof(ptbl->pfds[0]));
402 memmove(&ptbl->mds[session_index], &ptbl->mds[session_index + 1],
403 movelen * sizeof(ptbl->mds[0]));
404 }
405 ptbl->curlen--;
406 ptbl->revision++;
407 break;
408
409 case HST_UPD:
410 session_index = find_polltbl_idx(ptbl, hep->stuid);
411 assert(session_index > -1);
412 CALL_SMETHOD(ptbl->mds[session_index].skt->rcnt, decref);
413 ptbl->pfds[session_index].fd = CALL_METHOD(hep->skt, getfd);
414 ptbl->pfds[session_index].events = POLLIN;
415 ptbl->pfds[session_index].revents = 0;
416 ptbl->mds[session_index].skt = hep->skt;
417 ptbl->revision++;
418 break;
419 }
420 }
421 hp->ulen = 0;
422
423 ptbl->streams_wrt = hp->streams_wrt;
424 pthread_mutex_unlock(&pvt->lock);
425 return (1);
426 }
427