1 /*
2  * Copyright (c) 2007 Sippy Software, Inc., http://www.sippysoft.com
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  */
27 
28 #include <sys/socket.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <stdint.h>
32 
33 #include "rtpp_endian.h"
34 #include "rtp.h"
35 #include "rtp_info.h"
36 #include "rtp_packet.h"
37 #include "rtp_resizer.h"
38 #include "rtpp_proc.h"
39 #include "rtpp_types.h"
40 #include "rtpp_stats.h"
41 #include "rtpp_mallocs.h"
42 #include "rtpp_ssrc.h"
43 
44 struct rtp_resizer {
45     int         nsamples_total;
46 
47     int         seq_initialized;
48     uint16_t    seq;
49 
50     struct rtpp_ssrc ssrc;
51 
52     int         last_sent_ts_inited;
53     uint32_t    last_sent_ts;
54 
55     int         tsdelta_inited;
56     uint32_t    tsdelta;
57 
58     int         output_nsamples;
59     int         max_buf_nsamples;
60 
61     struct {
62         struct rtp_packet *first;
63         struct rtp_packet *last;
64     } queue;
65 };
66 
67 static int
min_nsamples(int codec_id)68 min_nsamples(int codec_id)
69 {
70 
71     switch (codec_id)
72     {
73     case RTP_GSM:
74         return 160; /* 20ms */
75     case RTP_G723:
76         return 240; /* 30ms */
77     default:
78         return 80; /* 10ms */
79     }
80 }
81 
82 struct rtp_resizer *
rtp_resizer_new(int output_ptime)83 rtp_resizer_new(int output_ptime)
84 {
85     struct rtp_resizer *this;
86 
87     this = rtpp_zmalloc(sizeof(struct rtp_resizer));
88     if (this == NULL)
89         return (NULL);
90     rtp_resizer_set_ptime(this, output_ptime);
91     return (this);
92 }
93 
94 void
rtp_resizer_free(struct rtpp_stats * rtpp_stats,struct rtp_resizer * this)95 rtp_resizer_free(struct rtpp_stats *rtpp_stats, struct rtp_resizer *this)
96 {
97     struct rtp_packet *p;
98     struct rtp_packet *p1;
99     int nfree;
100 
101     nfree = 0;
102     p = this->queue.first;
103     while (p != NULL) {
104         p1 = p;
105         p = p->next;
106         rtp_packet_free(p1);
107         nfree++;
108     }
109     free(this);
110     if (nfree > 0) {
111         CALL_METHOD(rtpp_stats, updatebyname, "npkts_resizer_discard", nfree);
112     }
113 }
114 
115 int
rtp_resizer_get_ptime(struct rtp_resizer * this)116 rtp_resizer_get_ptime(struct rtp_resizer *this)
117 {
118 
119     return(this->output_nsamples / 8);
120 }
121 
122 int
rtp_resizer_set_ptime(struct rtp_resizer * this,int ptime_new)123 rtp_resizer_set_ptime(struct rtp_resizer *this, int ptime_new)
124 {
125     int ptime_old;
126 
127     ptime_old = rtp_resizer_get_ptime(this);
128     this->output_nsamples = ptime_new * 8;
129     this->max_buf_nsamples = this->output_nsamples * 2;
130     if (this->max_buf_nsamples < 320) {
131         this->max_buf_nsamples = 320;
132     }
133     return (ptime_old);
134 }
135 
136 void
rtp_resizer_enqueue(struct rtp_resizer * this,struct rtp_packet ** pkt,struct rtpp_proc_rstats * rsp)137 rtp_resizer_enqueue(struct rtp_resizer *this, struct rtp_packet **pkt,
138   struct rtpp_proc_rstats *rsp)
139 {
140     struct rtp_packet   *p;
141     uint32_t            ref_ts, internal_ts;
142     int                 delta;
143 
144     p = *pkt;
145     if (rtp_packet_parse(p) != RTP_PARSER_OK)
146         return;
147 
148     if ((*pkt)->parsed->nsamples == RTP_NSAMPLES_UNKNOWN)
149         return;
150 
151     if (!this->ssrc.inited) {
152         this->ssrc.val = p->parsed->ssrc;
153         this->ssrc.inited = 1;
154     } else if (this->ssrc.val != p->parsed->ssrc) {
155         /* SSRC has been changed, TS and SEQ are no longer contiuous */
156         this->ssrc.val = p->parsed->ssrc;
157         this->last_sent_ts_inited = 0;
158         this->tsdelta_inited = 0;
159     }
160 
161     if (this->last_sent_ts_inited && ts_less((*pkt)->parsed->ts, this->last_sent_ts))
162     {
163         /* Packet arrived too late. Drop it. */
164         rtp_packet_free(*pkt);
165         *pkt = NULL;
166         rsp->npkts_resizer_discard.cnt++;
167         return;
168     }
169     internal_ts = (*pkt)->rtime * 8000.0;
170     if (!this->tsdelta_inited) {
171         this->tsdelta = (*pkt)->parsed->ts - internal_ts + 40;
172         this->tsdelta_inited = 1;
173     }
174     else {
175         ref_ts = internal_ts + this->tsdelta;
176         if (ts_less(ref_ts, (*pkt)->parsed->ts)) {
177             this->tsdelta = (*pkt)->parsed->ts - internal_ts + 40;
178 /*            printf("Sync forward\n"); */
179         }
180         else if (ts_less((*pkt)->parsed->ts + this->output_nsamples + 160, ref_ts))
181         {
182             delta = (ref_ts - ((*pkt)->parsed->ts + this->output_nsamples + 160)) / 2;
183             this->tsdelta -= delta;
184 /*            printf("Sync backward\n"); */
185         }
186     }
187     if (this->queue.last != NULL)
188     {
189         p = this->queue.last;
190         while (p != NULL && ts_less((*pkt)->parsed->ts, p->parsed->ts))
191              p = p->prev;
192 
193         if (p == NULL) /* head reached */
194         {
195             (*pkt)->next = this->queue.first;
196             (*pkt)->prev = NULL;
197             this->queue.first->prev = *pkt;
198             this->queue.first = *pkt;
199         }
200         else if (p == this->queue.last) /* tail of the queue */
201         {
202             (*pkt)->prev = this->queue.last;
203             (*pkt)->next = NULL;
204             this->queue.last->next = *pkt;
205             this->queue.last = *pkt;
206         }
207         else { /* middle of the queue */
208             (*pkt)->next = p->next;
209             (*pkt)->prev = p;
210             (*pkt)->next->prev = (*pkt)->prev->next = *pkt;
211         }
212     }
213     else {
214         this->queue.first = this->queue.last = *pkt;
215         (*pkt)->prev = NULL;
216 	(*pkt)->next = NULL;
217     }
218     this->nsamples_total += (*pkt)->parsed->nsamples;
219     *pkt = NULL; /* take control over the packet */
220 }
221 
222 static void
detach_queue_head(struct rtp_resizer * this)223 detach_queue_head(struct rtp_resizer *this)
224 {
225 
226     this->queue.first = this->queue.first->next;
227     if (this->queue.first == NULL)
228 	this->queue.last = NULL;
229     else
230 	this->queue.first->prev = NULL;
231 }
232 
233 static void
append_packet(struct rtp_packet * dst,struct rtp_packet * src)234 append_packet(struct rtp_packet *dst, struct rtp_packet *src)
235 {
236 
237     memcpy(&dst->data.buf[dst->parsed->data_offset + dst->parsed->data_size],
238       &src->data.buf[src->parsed->data_offset], src->parsed->data_size);
239     dst->parsed->nsamples += src->parsed->nsamples;
240     dst->parsed->data_size += src->parsed->data_size;
241     dst->size += src->parsed->data_size;
242     dst->parsed->appendable = src->parsed->appendable;
243 }
244 
245 static void
append_chunk(struct rtp_packet * dst,struct rtp_packet * src,const struct rtp_packet_chunk * chunk)246 append_chunk(struct rtp_packet *dst, struct rtp_packet *src, const struct rtp_packet_chunk *chunk)
247 {
248 
249     /* Copy chunk */
250     memcpy(&dst->data.buf[dst->parsed->data_offset + dst->parsed->data_size],
251       &src->data.buf[src->parsed->data_offset], chunk->bytes);
252     dst->parsed->nsamples += chunk->nsamples;
253     dst->parsed->data_size += chunk->bytes;
254     dst->size += chunk->bytes;
255 
256     /* Truncate the source packet */
257     src->parsed->nsamples -= chunk->nsamples;
258     rtp_packet_set_ts(src, src->parsed->ts + chunk->nsamples);
259     src->parsed->data_size -= chunk->bytes;
260     src->size -= chunk->bytes;
261     memmove(&src->data.buf[src->parsed->data_offset],
262       &src->data.buf[src->parsed->data_offset + chunk->bytes], src->parsed->data_size);
263 }
264 
265 static void
move_chunk(struct rtp_packet * dst,struct rtp_packet * src,const struct rtp_packet_chunk * chunk)266 move_chunk(struct rtp_packet *dst, struct rtp_packet *src, const struct rtp_packet_chunk *chunk)
267 {
268     /* Copy chunk */
269     memcpy(&dst->data.buf[dst->parsed->data_offset],
270       &src->data.buf[src->parsed->data_offset], chunk->bytes);
271     dst->parsed->nsamples = chunk->nsamples;
272     dst->parsed->data_size = chunk->bytes;
273     dst->size = dst->parsed->data_size + dst->parsed->data_offset;
274 
275     /* Truncate the source packet */
276     src->parsed->nsamples -= chunk->nsamples;
277     rtp_packet_set_ts(src, src->parsed->ts + chunk->nsamples);
278     src->parsed->data_size -= chunk->bytes;
279     src->size -= chunk->bytes;
280     memmove(&src->data.buf[src->parsed->data_offset],
281       &src->data.buf[src->parsed->data_offset + chunk->bytes], src->parsed->data_size);
282 }
283 
284 struct rtp_packet *
rtp_resizer_get(struct rtp_resizer * this,double dtime)285 rtp_resizer_get(struct rtp_resizer *this, double dtime)
286 {
287     struct rtp_packet *ret = NULL;
288     struct rtp_packet *p;
289     uint32_t    ref_ts;
290     int         count = 0;
291     int         split = 0;
292     int         nsamples_left;
293     int         output_nsamples;
294     int         min;
295     struct      rtp_packet_chunk chunk;
296 
297     if (this->queue.first == NULL)
298         return NULL;
299 
300     ref_ts = (dtime * 8000.0) + this->tsdelta;
301 
302     /* Wait untill enough data has arrived or timeout occured */
303     if (this->nsamples_total < this->output_nsamples &&
304         ts_less(ref_ts, this->queue.first->parsed->ts + this->max_buf_nsamples))
305     {
306         return NULL;
307     }
308 
309     output_nsamples = this->output_nsamples;
310     min = min_nsamples(this->queue.first->data.header.pt);
311     if (output_nsamples < min) {
312         output_nsamples = min;
313     } else if (output_nsamples % min != 0) {
314         output_nsamples += (min - (output_nsamples % min));
315     }
316 
317     /* Aggregate the output packet */
318     while ((ret == NULL || ret->parsed->nsamples < output_nsamples) && this->queue.first != NULL)
319     {
320         p = this->queue.first;
321         if (ret == NULL)
322         {
323             /* Look if the first packet is to be split */
324             if (p->parsed->nsamples > output_nsamples) {
325 		rtp_packet_first_chunk_find(p, &chunk, output_nsamples);
326 		if (chunk.whole_packet_matched) {
327 		    ret = p;
328 		    detach_queue_head(this);
329 		} else {
330 		    ret = rtp_packet_alloc();
331 		    if (ret == NULL)
332 			break;
333 		    rtp_packet_dup(ret, p, RTPP_DUP_HDRONLY);
334 		    move_chunk(ret, p, &chunk);
335 		    ++split;
336 		}
337 		if (!this->seq_initialized) {
338 		    this->seq = ret->parsed->seq;
339 		    this->seq_initialized = 1;
340 		}
341 		++count;
342 		break;
343 	    }
344         }
345         else /* ret != NULL */
346         {
347             /* detect holes and payload changes in RTP stream */
348             if ((ret->parsed->ts + ret->parsed->nsamples) != p->parsed->ts ||
349                 ret->data.header.pt != p->data.header.pt)
350             {
351                 break;
352             }
353             nsamples_left = output_nsamples - ret->parsed->nsamples;
354 
355             /* Break the input packet into pieces to create output packet
356              * of specified size */
357             if (nsamples_left > 0 && nsamples_left < p->parsed->nsamples) {
358 		rtp_packet_first_chunk_find(p, &chunk, nsamples_left);
359 		if (chunk.whole_packet_matched) {
360 		    /* Prevent RTP packet buffer overflow */
361 		    if ((ret->size + p->parsed->data_size) > sizeof(ret->data.buf))
362 			break;
363 		    append_packet(ret, p);
364 		    detach_queue_head(this);
365 		    rtp_packet_free(p);
366 		}
367 		else {
368 		    /* Prevent RTP packet buffer overflow */
369 		    if ((ret->size + chunk.bytes) > sizeof(ret->data.buf))
370 			break;
371 		    /* Append chunk to output */
372 		    append_chunk(ret, p, &chunk);
373 		    ++split;
374 		}
375 		++count;
376 		break;
377             }
378         }
379         ++count;
380 
381         /*
382          * Prevent RTP packet buffer overflow
383          */
384         if (ret != NULL && (ret->size + p->parsed->data_size) > sizeof(ret->data.buf))
385             break;
386 
387         /* Detach head packet from the queue */
388 	detach_queue_head(this);
389 
390         /*
391          * Add the packet to the output
392          */
393         if (ret == NULL) {
394             ret = p; /* use the first packet as the result container */
395             if (!this->seq_initialized) {
396                 this->seq = p->parsed->seq;
397                 this->seq_initialized = 1;
398             }
399         }
400         else {
401 	    append_packet(ret, p);
402             rtp_packet_free(p);
403         }
404 	/* Send non-appendable packet immediately */
405 	if (!ret->parsed->appendable)
406 	    break;
407     }
408     if (ret != NULL) {
409 	this->nsamples_total -= ret->parsed->nsamples;
410 	rtp_packet_set_seq(ret, this->seq);
411 	++this->seq;
412 	this->last_sent_ts_inited = 1;
413 	this->last_sent_ts = ret->parsed->ts + ret->parsed->nsamples;
414 /*
415 	printf("Payload %d, %d packets aggregated, %d splits done, final size %dms\n", ret->data.header.pt, count, split, ret->parsed->nsamples / 8);
416 */
417     }
418     return ret;
419 }
420