1 /*
2 * Copyright (c) 2007 Sippy Software, Inc., http://www.sippysoft.com
3 * All rights reserved.
4 *
5 * Redistribution and use in source and binary forms, with or without
6 * modification, are permitted provided that the following conditions
7 * are met:
8 * 1. Redistributions of source code must retain the above copyright
9 * notice, this list of conditions and the following disclaimer.
10 * 2. Redistributions in binary form must reproduce the above copyright
11 * notice, this list of conditions and the following disclaimer in the
12 * documentation and/or other materials provided with the distribution.
13 *
14 * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15 * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17 * ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18 * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19 * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20 * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21 * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22 * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23 * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24 * SUCH DAMAGE.
25 *
26 */
27
28 #include <sys/socket.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <stdint.h>
32
33 #include "rtpp_endian.h"
34 #include "rtp.h"
35 #include "rtp_info.h"
36 #include "rtp_packet.h"
37 #include "rtp_resizer.h"
38 #include "rtpp_proc.h"
39 #include "rtpp_types.h"
40 #include "rtpp_stats.h"
41 #include "rtpp_mallocs.h"
42 #include "rtpp_ssrc.h"
43
44 struct rtp_resizer {
45 int nsamples_total;
46
47 int seq_initialized;
48 uint16_t seq;
49
50 struct rtpp_ssrc ssrc;
51
52 int last_sent_ts_inited;
53 uint32_t last_sent_ts;
54
55 int tsdelta_inited;
56 uint32_t tsdelta;
57
58 int output_nsamples;
59 int max_buf_nsamples;
60
61 struct {
62 struct rtp_packet *first;
63 struct rtp_packet *last;
64 } queue;
65 };
66
67 static int
min_nsamples(int codec_id)68 min_nsamples(int codec_id)
69 {
70
71 switch (codec_id)
72 {
73 case RTP_GSM:
74 return 160; /* 20ms */
75 case RTP_G723:
76 return 240; /* 30ms */
77 default:
78 return 80; /* 10ms */
79 }
80 }
81
82 struct rtp_resizer *
rtp_resizer_new(int output_ptime)83 rtp_resizer_new(int output_ptime)
84 {
85 struct rtp_resizer *this;
86
87 this = rtpp_zmalloc(sizeof(struct rtp_resizer));
88 if (this == NULL)
89 return (NULL);
90 rtp_resizer_set_ptime(this, output_ptime);
91 return (this);
92 }
93
94 void
rtp_resizer_free(struct rtpp_stats * rtpp_stats,struct rtp_resizer * this)95 rtp_resizer_free(struct rtpp_stats *rtpp_stats, struct rtp_resizer *this)
96 {
97 struct rtp_packet *p;
98 struct rtp_packet *p1;
99 int nfree;
100
101 nfree = 0;
102 p = this->queue.first;
103 while (p != NULL) {
104 p1 = p;
105 p = p->next;
106 rtp_packet_free(p1);
107 nfree++;
108 }
109 free(this);
110 if (nfree > 0) {
111 CALL_METHOD(rtpp_stats, updatebyname, "npkts_resizer_discard", nfree);
112 }
113 }
114
115 int
rtp_resizer_get_ptime(struct rtp_resizer * this)116 rtp_resizer_get_ptime(struct rtp_resizer *this)
117 {
118
119 return(this->output_nsamples / 8);
120 }
121
122 int
rtp_resizer_set_ptime(struct rtp_resizer * this,int ptime_new)123 rtp_resizer_set_ptime(struct rtp_resizer *this, int ptime_new)
124 {
125 int ptime_old;
126
127 ptime_old = rtp_resizer_get_ptime(this);
128 this->output_nsamples = ptime_new * 8;
129 this->max_buf_nsamples = this->output_nsamples * 2;
130 if (this->max_buf_nsamples < 320) {
131 this->max_buf_nsamples = 320;
132 }
133 return (ptime_old);
134 }
135
136 void
rtp_resizer_enqueue(struct rtp_resizer * this,struct rtp_packet ** pkt,struct rtpp_proc_rstats * rsp)137 rtp_resizer_enqueue(struct rtp_resizer *this, struct rtp_packet **pkt,
138 struct rtpp_proc_rstats *rsp)
139 {
140 struct rtp_packet *p;
141 uint32_t ref_ts, internal_ts;
142 int delta;
143
144 p = *pkt;
145 if (rtp_packet_parse(p) != RTP_PARSER_OK)
146 return;
147
148 if ((*pkt)->parsed->nsamples == RTP_NSAMPLES_UNKNOWN)
149 return;
150
151 if (!this->ssrc.inited) {
152 this->ssrc.val = p->parsed->ssrc;
153 this->ssrc.inited = 1;
154 } else if (this->ssrc.val != p->parsed->ssrc) {
155 /* SSRC has been changed, TS and SEQ are no longer contiuous */
156 this->ssrc.val = p->parsed->ssrc;
157 this->last_sent_ts_inited = 0;
158 this->tsdelta_inited = 0;
159 }
160
161 if (this->last_sent_ts_inited && ts_less((*pkt)->parsed->ts, this->last_sent_ts))
162 {
163 /* Packet arrived too late. Drop it. */
164 rtp_packet_free(*pkt);
165 *pkt = NULL;
166 rsp->npkts_resizer_discard.cnt++;
167 return;
168 }
169 internal_ts = (*pkt)->rtime * 8000.0;
170 if (!this->tsdelta_inited) {
171 this->tsdelta = (*pkt)->parsed->ts - internal_ts + 40;
172 this->tsdelta_inited = 1;
173 }
174 else {
175 ref_ts = internal_ts + this->tsdelta;
176 if (ts_less(ref_ts, (*pkt)->parsed->ts)) {
177 this->tsdelta = (*pkt)->parsed->ts - internal_ts + 40;
178 /* printf("Sync forward\n"); */
179 }
180 else if (ts_less((*pkt)->parsed->ts + this->output_nsamples + 160, ref_ts))
181 {
182 delta = (ref_ts - ((*pkt)->parsed->ts + this->output_nsamples + 160)) / 2;
183 this->tsdelta -= delta;
184 /* printf("Sync backward\n"); */
185 }
186 }
187 if (this->queue.last != NULL)
188 {
189 p = this->queue.last;
190 while (p != NULL && ts_less((*pkt)->parsed->ts, p->parsed->ts))
191 p = p->prev;
192
193 if (p == NULL) /* head reached */
194 {
195 (*pkt)->next = this->queue.first;
196 (*pkt)->prev = NULL;
197 this->queue.first->prev = *pkt;
198 this->queue.first = *pkt;
199 }
200 else if (p == this->queue.last) /* tail of the queue */
201 {
202 (*pkt)->prev = this->queue.last;
203 (*pkt)->next = NULL;
204 this->queue.last->next = *pkt;
205 this->queue.last = *pkt;
206 }
207 else { /* middle of the queue */
208 (*pkt)->next = p->next;
209 (*pkt)->prev = p;
210 (*pkt)->next->prev = (*pkt)->prev->next = *pkt;
211 }
212 }
213 else {
214 this->queue.first = this->queue.last = *pkt;
215 (*pkt)->prev = NULL;
216 (*pkt)->next = NULL;
217 }
218 this->nsamples_total += (*pkt)->parsed->nsamples;
219 *pkt = NULL; /* take control over the packet */
220 }
221
222 static void
detach_queue_head(struct rtp_resizer * this)223 detach_queue_head(struct rtp_resizer *this)
224 {
225
226 this->queue.first = this->queue.first->next;
227 if (this->queue.first == NULL)
228 this->queue.last = NULL;
229 else
230 this->queue.first->prev = NULL;
231 }
232
233 static void
append_packet(struct rtp_packet * dst,struct rtp_packet * src)234 append_packet(struct rtp_packet *dst, struct rtp_packet *src)
235 {
236
237 memcpy(&dst->data.buf[dst->parsed->data_offset + dst->parsed->data_size],
238 &src->data.buf[src->parsed->data_offset], src->parsed->data_size);
239 dst->parsed->nsamples += src->parsed->nsamples;
240 dst->parsed->data_size += src->parsed->data_size;
241 dst->size += src->parsed->data_size;
242 dst->parsed->appendable = src->parsed->appendable;
243 }
244
245 static void
append_chunk(struct rtp_packet * dst,struct rtp_packet * src,const struct rtp_packet_chunk * chunk)246 append_chunk(struct rtp_packet *dst, struct rtp_packet *src, const struct rtp_packet_chunk *chunk)
247 {
248
249 /* Copy chunk */
250 memcpy(&dst->data.buf[dst->parsed->data_offset + dst->parsed->data_size],
251 &src->data.buf[src->parsed->data_offset], chunk->bytes);
252 dst->parsed->nsamples += chunk->nsamples;
253 dst->parsed->data_size += chunk->bytes;
254 dst->size += chunk->bytes;
255
256 /* Truncate the source packet */
257 src->parsed->nsamples -= chunk->nsamples;
258 rtp_packet_set_ts(src, src->parsed->ts + chunk->nsamples);
259 src->parsed->data_size -= chunk->bytes;
260 src->size -= chunk->bytes;
261 memmove(&src->data.buf[src->parsed->data_offset],
262 &src->data.buf[src->parsed->data_offset + chunk->bytes], src->parsed->data_size);
263 }
264
265 static void
move_chunk(struct rtp_packet * dst,struct rtp_packet * src,const struct rtp_packet_chunk * chunk)266 move_chunk(struct rtp_packet *dst, struct rtp_packet *src, const struct rtp_packet_chunk *chunk)
267 {
268 /* Copy chunk */
269 memcpy(&dst->data.buf[dst->parsed->data_offset],
270 &src->data.buf[src->parsed->data_offset], chunk->bytes);
271 dst->parsed->nsamples = chunk->nsamples;
272 dst->parsed->data_size = chunk->bytes;
273 dst->size = dst->parsed->data_size + dst->parsed->data_offset;
274
275 /* Truncate the source packet */
276 src->parsed->nsamples -= chunk->nsamples;
277 rtp_packet_set_ts(src, src->parsed->ts + chunk->nsamples);
278 src->parsed->data_size -= chunk->bytes;
279 src->size -= chunk->bytes;
280 memmove(&src->data.buf[src->parsed->data_offset],
281 &src->data.buf[src->parsed->data_offset + chunk->bytes], src->parsed->data_size);
282 }
283
284 struct rtp_packet *
rtp_resizer_get(struct rtp_resizer * this,double dtime)285 rtp_resizer_get(struct rtp_resizer *this, double dtime)
286 {
287 struct rtp_packet *ret = NULL;
288 struct rtp_packet *p;
289 uint32_t ref_ts;
290 int count = 0;
291 int split = 0;
292 int nsamples_left;
293 int output_nsamples;
294 int min;
295 struct rtp_packet_chunk chunk;
296
297 if (this->queue.first == NULL)
298 return NULL;
299
300 ref_ts = (dtime * 8000.0) + this->tsdelta;
301
302 /* Wait untill enough data has arrived or timeout occured */
303 if (this->nsamples_total < this->output_nsamples &&
304 ts_less(ref_ts, this->queue.first->parsed->ts + this->max_buf_nsamples))
305 {
306 return NULL;
307 }
308
309 output_nsamples = this->output_nsamples;
310 min = min_nsamples(this->queue.first->data.header.pt);
311 if (output_nsamples < min) {
312 output_nsamples = min;
313 } else if (output_nsamples % min != 0) {
314 output_nsamples += (min - (output_nsamples % min));
315 }
316
317 /* Aggregate the output packet */
318 while ((ret == NULL || ret->parsed->nsamples < output_nsamples) && this->queue.first != NULL)
319 {
320 p = this->queue.first;
321 if (ret == NULL)
322 {
323 /* Look if the first packet is to be split */
324 if (p->parsed->nsamples > output_nsamples) {
325 rtp_packet_first_chunk_find(p, &chunk, output_nsamples);
326 if (chunk.whole_packet_matched) {
327 ret = p;
328 detach_queue_head(this);
329 } else {
330 ret = rtp_packet_alloc();
331 if (ret == NULL)
332 break;
333 rtp_packet_dup(ret, p, RTPP_DUP_HDRONLY);
334 move_chunk(ret, p, &chunk);
335 ++split;
336 }
337 if (!this->seq_initialized) {
338 this->seq = ret->parsed->seq;
339 this->seq_initialized = 1;
340 }
341 ++count;
342 break;
343 }
344 }
345 else /* ret != NULL */
346 {
347 /* detect holes and payload changes in RTP stream */
348 if ((ret->parsed->ts + ret->parsed->nsamples) != p->parsed->ts ||
349 ret->data.header.pt != p->data.header.pt)
350 {
351 break;
352 }
353 nsamples_left = output_nsamples - ret->parsed->nsamples;
354
355 /* Break the input packet into pieces to create output packet
356 * of specified size */
357 if (nsamples_left > 0 && nsamples_left < p->parsed->nsamples) {
358 rtp_packet_first_chunk_find(p, &chunk, nsamples_left);
359 if (chunk.whole_packet_matched) {
360 /* Prevent RTP packet buffer overflow */
361 if ((ret->size + p->parsed->data_size) > sizeof(ret->data.buf))
362 break;
363 append_packet(ret, p);
364 detach_queue_head(this);
365 rtp_packet_free(p);
366 }
367 else {
368 /* Prevent RTP packet buffer overflow */
369 if ((ret->size + chunk.bytes) > sizeof(ret->data.buf))
370 break;
371 /* Append chunk to output */
372 append_chunk(ret, p, &chunk);
373 ++split;
374 }
375 ++count;
376 break;
377 }
378 }
379 ++count;
380
381 /*
382 * Prevent RTP packet buffer overflow
383 */
384 if (ret != NULL && (ret->size + p->parsed->data_size) > sizeof(ret->data.buf))
385 break;
386
387 /* Detach head packet from the queue */
388 detach_queue_head(this);
389
390 /*
391 * Add the packet to the output
392 */
393 if (ret == NULL) {
394 ret = p; /* use the first packet as the result container */
395 if (!this->seq_initialized) {
396 this->seq = p->parsed->seq;
397 this->seq_initialized = 1;
398 }
399 }
400 else {
401 append_packet(ret, p);
402 rtp_packet_free(p);
403 }
404 /* Send non-appendable packet immediately */
405 if (!ret->parsed->appendable)
406 break;
407 }
408 if (ret != NULL) {
409 this->nsamples_total -= ret->parsed->nsamples;
410 rtp_packet_set_seq(ret, this->seq);
411 ++this->seq;
412 this->last_sent_ts_inited = 1;
413 this->last_sent_ts = ret->parsed->ts + ret->parsed->nsamples;
414 /*
415 printf("Payload %d, %d packets aggregated, %d splits done, final size %dms\n", ret->data.header.pt, count, split, ret->parsed->nsamples / 8);
416 */
417 }
418 return ret;
419 }
420