1 /*
2 * Copyright 2008-2014 Arsen Chaloyan
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 *
16 * $Id: mpf_rtp_stream.c 2136 2014-07-04 06:33:36Z achaloyan@gmail.com $
17 */
18
19 #include <apr_network_io.h>
20 #include "apt_net.h"
21 #include "apt_timer_queue.h"
22 #include "mpf_rtp_stream.h"
23 #include "mpf_termination.h"
24 #include "mpf_codec_manager.h"
25 #include "mpf_rtp_header.h"
26 #include "mpf_rtcp_packet.h"
27 #include "mpf_rtp_defs.h"
28 #include "mpf_rtp_pt.h"
29 #include "mpf_trace.h"
30 #include "apt_log.h"
31
32 /** Max size of RTP packet */
33 #define MAX_RTP_PACKET_SIZE 1500
34 /** Max size of RTCP packet */
35 #define MAX_RTCP_PACKET_SIZE 1500
36
37 /* Reason strings used in RTCP BYE messages (informative only) */
38 #define RTCP_BYE_SESSION_ENDED "Session ended"
39 #define RTCP_BYE_TALKSPURT_ENDED "Talskpurt ended"
40
41 #if ENABLE_RTP_PACKET_TRACE == 1
42 #define RTP_TRACE printf
43 #elif ENABLE_RTP_PACKET_TRACE == 2
44 #define RTP_TRACE mpf_debug_output_trace
45 #else
46 #define RTP_TRACE mpf_null_trace
47 #endif
48
49 /** RTP stream */
50 typedef struct mpf_rtp_stream_t mpf_rtp_stream_t;
51 struct mpf_rtp_stream_t {
52 mpf_audio_stream_t *base;
53
54 mpf_rtp_media_descriptor_t *local_media;
55 mpf_rtp_media_descriptor_t *remote_media;
56 mpf_media_state_e state;
57
58 rtp_transmitter_t transmitter;
59 rtp_receiver_t receiver;
60
61 mpf_rtp_config_t *config;
62 mpf_rtp_settings_t *settings;
63
64 apr_socket_t *rtp_socket;
65 apr_socket_t *rtcp_socket;
66 apr_sockaddr_t *rtp_l_sockaddr;
67 apr_sockaddr_t *rtp_r_sockaddr;
68 apr_sockaddr_t *rtcp_l_sockaddr;
69 apr_sockaddr_t *rtcp_r_sockaddr;
70
71 apt_timer_t *rtcp_tx_timer;
72 apt_timer_t *rtcp_rx_timer;
73
74 apr_pool_t *pool;
75 };
76
77 static apt_bool_t mpf_rtp_stream_destroy(mpf_audio_stream_t *stream);
78 static apt_bool_t mpf_rtp_rx_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec);
79 static apt_bool_t mpf_rtp_rx_stream_close(mpf_audio_stream_t *stream);
80 static apt_bool_t mpf_rtp_stream_receive(mpf_audio_stream_t *stream, mpf_frame_t *frame);
81 static apt_bool_t mpf_rtp_tx_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec);
82 static apt_bool_t mpf_rtp_tx_stream_close(mpf_audio_stream_t *stream);
83 static apt_bool_t mpf_rtp_stream_transmit(mpf_audio_stream_t *stream, const mpf_frame_t *frame);
84
85 static const mpf_audio_stream_vtable_t vtable = {
86 mpf_rtp_stream_destroy,
87 mpf_rtp_rx_stream_open,
88 mpf_rtp_rx_stream_close,
89 mpf_rtp_stream_receive,
90 mpf_rtp_tx_stream_open,
91 mpf_rtp_tx_stream_close,
92 mpf_rtp_stream_transmit,
93 NULL /* mpf_rtp_stream_trace */
94 };
95
96 static apt_bool_t mpf_rtp_socket_pair_create(mpf_rtp_stream_t *stream, mpf_rtp_media_descriptor_t *local_media, apt_bool_t bind);
97 static apt_bool_t mpf_rtp_socket_pair_bind(mpf_rtp_stream_t *stream, mpf_rtp_media_descriptor_t *local_media);
98 static void mpf_rtp_socket_pair_close(mpf_rtp_stream_t *stream);
99
100 static apt_bool_t mpf_rtcp_report_send(mpf_rtp_stream_t *stream);
101 static apt_bool_t mpf_rtcp_bye_send(mpf_rtp_stream_t *stream, apt_str_t *reason);
102 static void mpf_rtcp_tx_timer_proc(apt_timer_t *timer, void *obj);
103 static void mpf_rtcp_rx_timer_proc(apt_timer_t *timer, void *obj);
104
105
mpf_rtp_stream_create(mpf_termination_t * termination,mpf_rtp_config_t * config,mpf_rtp_settings_t * settings,apr_pool_t * pool)106 MPF_DECLARE(mpf_audio_stream_t*) mpf_rtp_stream_create(mpf_termination_t *termination, mpf_rtp_config_t *config, mpf_rtp_settings_t *settings, apr_pool_t *pool)
107 {
108 mpf_rtp_stream_t *rtp_stream = apr_palloc(pool,sizeof(mpf_rtp_stream_t));
109 mpf_stream_capabilities_t *capabilities = mpf_stream_capabilities_create(STREAM_DIRECTION_DUPLEX,pool);
110 mpf_audio_stream_t *audio_stream = mpf_audio_stream_create(rtp_stream,&vtable,capabilities,pool);
111 if(!audio_stream) {
112 return NULL;
113 }
114
115 audio_stream->direction = STREAM_DIRECTION_NONE;
116 audio_stream->termination = termination;
117
118 rtp_stream->base = audio_stream;
119 rtp_stream->pool = pool;
120 rtp_stream->config = config;
121 rtp_stream->settings = settings;
122 rtp_stream->local_media = NULL;
123 rtp_stream->remote_media = NULL;
124 rtp_stream->rtp_socket = NULL;
125 rtp_stream->rtcp_socket = NULL;
126 rtp_stream->rtp_l_sockaddr = NULL;
127 rtp_stream->rtp_r_sockaddr = NULL;
128 rtp_stream->rtcp_l_sockaddr = NULL;
129 rtp_stream->rtcp_r_sockaddr = NULL;
130 rtp_stream->rtcp_tx_timer = NULL;
131 rtp_stream->rtcp_rx_timer = NULL;
132 rtp_stream->state = MPF_MEDIA_DISABLED;
133 rtp_receiver_init(&rtp_stream->receiver);
134 rtp_transmitter_init(&rtp_stream->transmitter);
135 rtp_stream->transmitter.sr_stat.ssrc = (apr_uint32_t)apr_time_now();
136
137 if(settings->rtcp == TRUE) {
138 if(settings->rtcp_tx_interval) {
139 rtp_stream->rtcp_tx_timer = apt_timer_create(
140 termination->timer_queue,
141 mpf_rtcp_tx_timer_proc,
142 rtp_stream, pool);
143 }
144 if(settings->rtcp_rx_resolution) {
145 rtp_stream->rtcp_rx_timer = apt_timer_create(
146 termination->timer_queue,
147 mpf_rtcp_rx_timer_proc,
148 rtp_stream, pool);
149 }
150 }
151
152 return audio_stream;
153 }
154
mpf_rtp_stream_local_media_create(mpf_rtp_stream_t * rtp_stream,mpf_rtp_media_descriptor_t * local_media,mpf_rtp_media_descriptor_t * remote_media,mpf_stream_capabilities_t * capabilities)155 static apt_bool_t mpf_rtp_stream_local_media_create(mpf_rtp_stream_t *rtp_stream, mpf_rtp_media_descriptor_t *local_media, mpf_rtp_media_descriptor_t *remote_media, mpf_stream_capabilities_t *capabilities)
156 {
157 apt_bool_t status = TRUE;
158 if(!local_media) {
159 /* local media is not specified, create the default one */
160 local_media = apr_palloc(rtp_stream->pool,sizeof(mpf_rtp_media_descriptor_t));
161 mpf_rtp_media_descriptor_init(local_media);
162 local_media->state = MPF_MEDIA_ENABLED;
163 local_media->direction = STREAM_DIRECTION_DUPLEX;
164 }
165 if(remote_media) {
166 local_media->id = remote_media->id;
167 }
168 if(local_media->ip.length == 0) {
169 local_media->ip = rtp_stream->config->ip;
170 local_media->ext_ip = rtp_stream->config->ext_ip;
171 }
172 if(local_media->port == 0) {
173 if(mpf_rtp_socket_pair_create(rtp_stream,local_media,FALSE) == TRUE) {
174 /* RTP port management */
175 mpf_rtp_config_t *rtp_config = rtp_stream->config;
176 apr_port_t first_port_in_search = rtp_config->rtp_port_cur;
177 apt_bool_t is_port_ok = FALSE;
178 do {
179 local_media->port = rtp_config->rtp_port_cur;
180 rtp_config->rtp_port_cur += 2;
181 if(rtp_config->rtp_port_cur == rtp_config->rtp_port_max) {
182 rtp_config->rtp_port_cur = rtp_config->rtp_port_min;
183 }
184
185 if(mpf_rtp_socket_pair_bind(rtp_stream,local_media) == TRUE) {
186 is_port_ok = TRUE;
187 break;
188 }
189 } while(first_port_in_search != rtp_config->rtp_port_cur);
190
191 if(is_port_ok == FALSE) {
192 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Find Free RTP Port %s:[%hu,%hu]",
193 rtp_config->ip.buf,
194 rtp_config->rtp_port_min,
195 rtp_config->rtp_port_max);
196 mpf_rtp_socket_pair_close(rtp_stream);
197 status = FALSE;
198 }
199 }
200 else {
201 status = FALSE;
202 }
203 }
204 else if(mpf_rtp_socket_pair_create(rtp_stream,local_media,TRUE) == FALSE) {
205 status = FALSE;
206 }
207
208 if(status == FALSE) {
209 local_media->state = MPF_MEDIA_DISABLED;
210 }
211
212 if(rtp_stream->settings->ptime) {
213 local_media->ptime = rtp_stream->settings->ptime;
214 }
215
216 if(mpf_codec_list_is_empty(&local_media->codec_list) == TRUE) {
217 if(mpf_codec_list_is_empty(&rtp_stream->settings->codec_list) == TRUE) {
218 mpf_codec_manager_codec_list_get(
219 rtp_stream->base->termination->codec_manager,
220 &local_media->codec_list,
221 rtp_stream->pool);
222 }
223 else {
224 mpf_codec_list_copy(&local_media->codec_list,
225 &rtp_stream->settings->codec_list,
226 rtp_stream->pool);
227 }
228 }
229
230 if(capabilities) {
231 if(mpf_codec_list_match(&local_media->codec_list,&capabilities->codecs) == FALSE) {
232 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Match Codec List %s:%hu",
233 local_media->ip.buf,
234 local_media->port);
235 local_media->state = MPF_MEDIA_DISABLED;
236 status = FALSE;
237 }
238 }
239
240 rtp_stream->local_media = local_media;
241 return status;
242 }
243
mpf_rtp_stream_local_media_update(mpf_rtp_stream_t * rtp_stream,mpf_rtp_media_descriptor_t * media,mpf_stream_capabilities_t * capabilities)244 static apt_bool_t mpf_rtp_stream_local_media_update(mpf_rtp_stream_t *rtp_stream, mpf_rtp_media_descriptor_t *media, mpf_stream_capabilities_t *capabilities)
245 {
246 apt_bool_t status = TRUE;
247 if(apt_string_compare(&rtp_stream->local_media->ip,&media->ip) == FALSE ||
248 rtp_stream->local_media->port != media->port) {
249
250 mpf_rtp_socket_pair_close(rtp_stream);
251
252 if(mpf_rtp_socket_pair_create(rtp_stream,media,TRUE) == FALSE) {
253 media->state = MPF_MEDIA_DISABLED;
254 status = FALSE;
255 }
256 }
257 if(mpf_codec_list_is_empty(&media->codec_list) == TRUE) {
258 mpf_codec_manager_codec_list_get(
259 rtp_stream->base->termination->codec_manager,
260 &media->codec_list,
261 rtp_stream->pool);
262 }
263
264 if(capabilities) {
265 if(mpf_codec_list_match(&media->codec_list,&capabilities->codecs) == FALSE) {
266 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Match Codec List %s:%hu",
267 media->ip.buf,
268 media->port);
269 media->state = MPF_MEDIA_DISABLED;
270 status = FALSE;
271 }
272 }
273
274 rtp_stream->local_media = media;
275 return status;
276 }
277
mpf_rtp_stream_remote_media_update(mpf_rtp_stream_t * rtp_stream,mpf_rtp_media_descriptor_t * media)278 static apt_bool_t mpf_rtp_stream_remote_media_update(mpf_rtp_stream_t *rtp_stream, mpf_rtp_media_descriptor_t *media)
279 {
280 apt_bool_t status = TRUE;
281 if(media->state == MPF_MEDIA_ENABLED) {
282 if(!rtp_stream->remote_media ||
283 apt_string_compare(&rtp_stream->remote_media->ip,&media->ip) == FALSE ||
284 rtp_stream->remote_media->port != media->port) {
285
286 /* update RTP port */
287 rtp_stream->rtp_r_sockaddr = NULL;
288 apr_sockaddr_info_get(
289 &rtp_stream->rtp_r_sockaddr,
290 media->ip.buf,
291 APR_INET,
292 media->port,
293 0,
294 rtp_stream->pool);
295 if(!rtp_stream->rtp_r_sockaddr) {
296 status = FALSE;
297 }
298
299 /* update RTCP port */
300 rtp_stream->rtcp_r_sockaddr = NULL;
301 apr_sockaddr_info_get(
302 &rtp_stream->rtcp_r_sockaddr,
303 media->ip.buf,
304 APR_INET,
305 media->port+1,
306 0,
307 rtp_stream->pool);
308 }
309 }
310
311 rtp_stream->remote_media = media;
312 return status;
313 }
314
mpf_rtp_stream_media_negotiate(mpf_rtp_stream_t * rtp_stream)315 static apt_bool_t mpf_rtp_stream_media_negotiate(mpf_rtp_stream_t *rtp_stream)
316 {
317 mpf_rtp_media_descriptor_t *local_media = rtp_stream->local_media;
318 mpf_rtp_media_descriptor_t *remote_media = rtp_stream->remote_media;
319 if(!local_media || !remote_media) {
320 return FALSE;
321 }
322
323 local_media->id = remote_media->id;
324 local_media->mid = remote_media->mid;
325 local_media->ptime = remote_media->ptime;
326
327 if(rtp_stream->state == MPF_MEDIA_DISABLED && remote_media->state == MPF_MEDIA_ENABLED) {
328 /* enable RTP/RTCP session */
329 rtp_stream->state = MPF_MEDIA_ENABLED;
330 if(rtp_stream->rtp_l_sockaddr) {
331 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Enable RTP Session %s:%hu",
332 rtp_stream->rtp_l_sockaddr->hostname,
333 rtp_stream->rtp_l_sockaddr->port);
334 }
335
336 if(rtp_stream->rtcp_tx_timer) {
337 apt_timer_set(rtp_stream->rtcp_tx_timer,rtp_stream->settings->rtcp_tx_interval);
338 }
339 if(rtp_stream->rtcp_rx_timer) {
340 apt_timer_set(rtp_stream->rtcp_rx_timer,rtp_stream->settings->rtcp_rx_resolution);
341 }
342 }
343 else if(rtp_stream->state == MPF_MEDIA_ENABLED && remote_media->state == MPF_MEDIA_DISABLED) {
344 /* disable RTP/RTCP session */
345 rtp_stream->state = MPF_MEDIA_DISABLED;
346 if(rtp_stream->rtp_l_sockaddr) {
347 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Disable RTP Session %s:%hu",
348 rtp_stream->rtp_l_sockaddr->hostname,
349 rtp_stream->rtp_l_sockaddr->port);
350 }
351
352 if(rtp_stream->rtcp_tx_timer) {
353 apt_timer_kill(rtp_stream->rtcp_tx_timer);
354 }
355 if(rtp_stream->rtcp_rx_timer) {
356 apt_timer_kill(rtp_stream->rtcp_rx_timer);
357 }
358 if(rtp_stream->settings->rtcp == TRUE && rtp_stream->settings->rtcp_bye_policy != RTCP_BYE_DISABLE) {
359 apt_str_t reason = {RTCP_BYE_SESSION_ENDED, sizeof(RTCP_BYE_SESSION_ENDED)-1};
360 mpf_rtcp_bye_send(rtp_stream,&reason);
361 }
362 }
363
364 local_media->state = remote_media->state;
365 local_media->direction = mpf_stream_reverse_direction_get(remote_media->direction);
366
367 if(remote_media->state == MPF_MEDIA_ENABLED) {
368 mpf_codec_list_t *codec_list1 = NULL;
369 mpf_codec_list_t *codec_list2 = NULL;
370
371 /* intersect local and remote codecs */
372 if(rtp_stream->settings->own_preferrence == TRUE) {
373 codec_list1 = &local_media->codec_list;
374 codec_list2 = &remote_media->codec_list;
375 }
376 else {
377 codec_list2 = &local_media->codec_list;
378 codec_list1 = &remote_media->codec_list;
379 }
380
381 if(mpf_codec_lists_intersect(codec_list1,codec_list2) == FALSE) {
382 /* reject RTP/RTCP session */
383 rtp_stream->state = MPF_MEDIA_DISABLED;
384 local_media->direction = STREAM_DIRECTION_NONE;
385 local_media->state = MPF_MEDIA_DISABLED;
386 if(rtp_stream->rtp_l_sockaddr) {
387 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Reject RTP Session %s:%hu no codecs matched",
388 rtp_stream->rtp_l_sockaddr->hostname,
389 rtp_stream->rtp_l_sockaddr->port);
390 }
391
392 if(rtp_stream->rtcp_tx_timer) {
393 apt_timer_kill(rtp_stream->rtcp_tx_timer);
394 }
395 if(rtp_stream->rtcp_rx_timer) {
396 apt_timer_kill(rtp_stream->rtcp_rx_timer);
397 }
398 }
399 }
400
401 rtp_stream->base->direction = local_media->direction;
402 return TRUE;
403 }
404
mpf_rtp_stream_add(mpf_audio_stream_t * stream)405 MPF_DECLARE(apt_bool_t) mpf_rtp_stream_add(mpf_audio_stream_t *stream)
406 {
407 return TRUE;
408 }
409
mpf_rtp_stream_remove(mpf_audio_stream_t * stream)410 MPF_DECLARE(apt_bool_t) mpf_rtp_stream_remove(mpf_audio_stream_t *stream)
411 {
412 mpf_rtp_stream_t *rtp_stream = stream->obj;
413
414 if(rtp_stream->state == MPF_MEDIA_ENABLED) {
415 /* disable RTP/RTCP session */
416 rtp_stream->state = MPF_MEDIA_DISABLED;
417 if(rtp_stream->rtp_l_sockaddr) {
418 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Remove RTP Session %s:%hu",
419 rtp_stream->rtp_l_sockaddr->hostname,
420 rtp_stream->rtp_l_sockaddr->port);
421 }
422
423 if(rtp_stream->rtcp_tx_timer) {
424 apt_timer_kill(rtp_stream->rtcp_tx_timer);
425 }
426 if(rtp_stream->rtcp_rx_timer) {
427 apt_timer_kill(rtp_stream->rtcp_rx_timer);
428 }
429 if(rtp_stream->settings->rtcp == TRUE && rtp_stream->settings->rtcp_bye_policy != RTCP_BYE_DISABLE) {
430 apt_str_t reason = {RTCP_BYE_SESSION_ENDED, sizeof(RTCP_BYE_SESSION_ENDED)-1};
431 mpf_rtcp_bye_send(rtp_stream,&reason);
432 }
433 }
434
435 mpf_rtp_socket_pair_close(rtp_stream);
436 return TRUE;
437 }
438
mpf_rtp_stream_modify(mpf_audio_stream_t * stream,mpf_rtp_stream_descriptor_t * descriptor)439 MPF_DECLARE(apt_bool_t) mpf_rtp_stream_modify(mpf_audio_stream_t *stream, mpf_rtp_stream_descriptor_t *descriptor)
440 {
441 apt_bool_t status = TRUE;
442 mpf_rtp_stream_t *rtp_stream = stream->obj;
443 if(!rtp_stream) {
444 return FALSE;
445 }
446
447 if(!rtp_stream->local_media) {
448 /* create local media */
449 status = mpf_rtp_stream_local_media_create(rtp_stream,descriptor->local,descriptor->remote,descriptor->capabilities);
450 }
451 else if(descriptor->local) {
452 /* update local media */
453 status = mpf_rtp_stream_local_media_update(rtp_stream,descriptor->local,descriptor->capabilities);
454 }
455
456 if(descriptor->remote && status == TRUE) {
457 /* update remote media */
458 mpf_rtp_stream_remote_media_update(rtp_stream,descriptor->remote);
459
460 /* negotiate local and remote media */
461 mpf_rtp_stream_media_negotiate(rtp_stream);
462 }
463
464 if((rtp_stream->base->direction & STREAM_DIRECTION_SEND) == STREAM_DIRECTION_SEND) {
465 mpf_codec_list_t *codec_list = &rtp_stream->remote_media->codec_list;
466 rtp_stream->base->tx_descriptor = codec_list->primary_descriptor;
467 if(rtp_stream->base->tx_descriptor) {
468 rtp_stream->transmitter.samples_per_frame =
469 (apr_uint32_t)mpf_codec_frame_samples_calculate(rtp_stream->base->tx_descriptor);
470 }
471 if(codec_list->event_descriptor) {
472 rtp_stream->base->tx_event_descriptor = codec_list->event_descriptor;
473 }
474 }
475 if((rtp_stream->base->direction & STREAM_DIRECTION_RECEIVE) == STREAM_DIRECTION_RECEIVE) {
476 mpf_codec_list_t *codec_list = &rtp_stream->local_media->codec_list;
477 rtp_stream->base->rx_descriptor = codec_list->primary_descriptor;
478 if(codec_list->event_descriptor) {
479 rtp_stream->base->rx_event_descriptor = codec_list->event_descriptor;
480 }
481 }
482
483 if(!descriptor->local) {
484 descriptor->local = rtp_stream->local_media;
485 }
486 return status;
487 }
488
mpf_rtp_stream_destroy(mpf_audio_stream_t * stream)489 static apt_bool_t mpf_rtp_stream_destroy(mpf_audio_stream_t *stream)
490 {
491 return TRUE;
492 }
493
mpf_rtp_rx_stream_open(mpf_audio_stream_t * stream,mpf_codec_t * codec)494 static apt_bool_t mpf_rtp_rx_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec)
495 {
496 mpf_rtp_stream_t *rtp_stream = stream->obj;
497 rtp_receiver_t *receiver = &rtp_stream->receiver;
498 mpf_jb_config_t *jb_config = &rtp_stream->settings->jb_config;
499 if(!rtp_stream->rtp_socket || !rtp_stream->rtp_l_sockaddr || !rtp_stream->rtp_r_sockaddr) {
500 return FALSE;
501 }
502
503 receiver->jb = mpf_jitter_buffer_create(
504 jb_config,
505 stream->rx_descriptor,
506 codec,
507 rtp_stream->pool);
508
509 apt_log(APT_LOG_MARK,APT_PRIO_INFO,
510 "Open RTP Receiver %s:%hu <- %s:%hu playout [%u ms] bounds [%u - %u ms] adaptive [%d] skew detection [%d]",
511 rtp_stream->rtp_l_sockaddr->hostname,
512 rtp_stream->rtp_l_sockaddr->port,
513 rtp_stream->rtp_r_sockaddr->hostname,
514 rtp_stream->rtp_r_sockaddr->port,
515 jb_config->initial_playout_delay,
516 jb_config->min_playout_delay,
517 jb_config->max_playout_delay,
518 jb_config->adaptive,
519 jb_config->time_skew_detection);
520 return TRUE;
521 }
522
mpf_rtp_rx_stream_close(mpf_audio_stream_t * stream)523 static apt_bool_t mpf_rtp_rx_stream_close(mpf_audio_stream_t *stream)
524 {
525 mpf_rtp_stream_t *rtp_stream = stream->obj;
526 rtp_receiver_t *receiver = &rtp_stream->receiver;
527
528 if(!rtp_stream->rtp_l_sockaddr || !rtp_stream->rtp_r_sockaddr) {
529 return FALSE;
530 }
531
532 receiver->stat.lost_packets = 0;
533 if(receiver->stat.received_packets) {
534 apr_uint32_t expected_packets = receiver->history.seq_cycles +
535 receiver->history.seq_num_max - receiver->history.seq_num_base + 1;
536 if(expected_packets > receiver->stat.received_packets) {
537 receiver->stat.lost_packets = expected_packets - receiver->stat.received_packets;
538 }
539 }
540
541 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close RTP Receiver %s:%hu <- %s:%hu [r:%u l:%u j:%u p:%u d:%u i:%u]",
542 rtp_stream->rtp_l_sockaddr->hostname,
543 rtp_stream->rtp_l_sockaddr->port,
544 rtp_stream->rtp_r_sockaddr->hostname,
545 rtp_stream->rtp_r_sockaddr->port,
546 receiver->stat.received_packets,
547 receiver->stat.lost_packets,
548 receiver->rr_stat.jitter,
549 mpf_jitter_buffer_playout_delay_get(receiver->jb),
550 receiver->stat.discarded_packets,
551 receiver->stat.ignored_packets);
552 mpf_jitter_buffer_destroy(receiver->jb);
553 return TRUE;
554 }
555
556
rtp_rx_overall_stat_reset(rtp_receiver_t * receiver)557 static APR_INLINE void rtp_rx_overall_stat_reset(rtp_receiver_t *receiver)
558 {
559 memset(&receiver->stat,0,sizeof(receiver->stat));
560 memset(&receiver->history,0,sizeof(receiver->history));
561 memset(&receiver->periodic_history,0,sizeof(receiver->periodic_history));
562 }
563
rtp_rx_stat_init(rtp_receiver_t * receiver,rtp_header_t * header,apr_time_t * time)564 static APR_INLINE void rtp_rx_stat_init(rtp_receiver_t *receiver, rtp_header_t *header, apr_time_t *time)
565 {
566 receiver->rr_stat.ssrc = header->ssrc;
567 receiver->history.seq_num_base = receiver->history.seq_num_max = (apr_uint16_t)header->sequence;
568 receiver->history.ts_last = header->timestamp;
569 receiver->history.time_last = *time;
570 }
571
rtp_rx_restart(rtp_receiver_t * receiver)572 static APR_INLINE void rtp_rx_restart(rtp_receiver_t *receiver)
573 {
574 apr_byte_t restarts = ++receiver->stat.restarts;
575 rtp_rx_overall_stat_reset(receiver);
576 mpf_jitter_buffer_restart(receiver->jb);
577 receiver->stat.restarts = restarts;
578 }
579
rtp_rx_header_skip(void ** buffer,apr_size_t * size)580 static rtp_header_t* rtp_rx_header_skip(void **buffer, apr_size_t *size)
581 {
582 apr_size_t offset = 0;
583 rtp_header_t *header = (rtp_header_t*)*buffer;
584
585 /* RTP header validity check */
586 if(header->version != RTP_VERSION) {
587 return NULL;
588 }
589
590 /* calculate payload offset */
591 offset = sizeof(rtp_header_t) + (header->count * sizeof(apr_uint32_t));
592
593 /* additional offset in case of RTP extension */
594 if(header->extension) {
595 rtp_extension_header_t *ext_header = (rtp_extension_header_t*)(((apr_byte_t*)*buffer)+offset);
596 offset += (ntohs(ext_header->length) * sizeof(apr_uint32_t));
597 }
598
599 if (offset >= *size) {
600 return NULL;
601 }
602
603 /* skip to payload */
604 *buffer = (apr_byte_t*)*buffer + offset;
605 *size = *size - offset;
606
607 return header;
608 }
609
rtp_periodic_history_update(rtp_receiver_t * receiver)610 static APR_INLINE void rtp_periodic_history_update(rtp_receiver_t *receiver)
611 {
612 apr_uint32_t expected_packets;
613 apr_uint32_t expected_interval;
614 apr_uint32_t received_interval;
615 apr_uint32_t lost_interval;
616
617 /* calculate expected packets */
618 if(receiver->stat.received_packets) {
619 expected_packets = receiver->history.seq_cycles +
620 receiver->history.seq_num_max - receiver->history.seq_num_base + 1;
621 }
622 else {
623 expected_packets = 0;
624 }
625
626 /* calculate expected interval */
627 expected_interval = expected_packets - receiver->periodic_history.expected_prior;
628 /* update expected prior */
629 receiver->periodic_history.expected_prior = expected_packets;
630
631 /* calculate received interval */
632 received_interval = receiver->stat.received_packets - receiver->periodic_history.received_prior;
633 /* update received prior */
634 receiver->periodic_history.received_prior = receiver->stat.received_packets;
635 /* calculate lost interval */
636 if(expected_interval > received_interval) {
637 lost_interval = expected_interval - received_interval;
638 }
639 else {
640 lost_interval = 0;
641 }
642
643 /* update lost fraction */
644 if(expected_interval == 0 || lost_interval == 0) {
645 receiver->rr_stat.fraction = 0;
646 }
647 else {
648 receiver->rr_stat.fraction = (lost_interval << 8) / expected_interval;
649 }
650
651 if(expected_packets > receiver->stat.received_packets) {
652 receiver->rr_stat.lost = expected_packets - receiver->stat.received_packets;
653 }
654 else {
655 receiver->rr_stat.lost = 0;
656 }
657
658 receiver->periodic_history.discarded_prior = receiver->stat.discarded_packets;
659 receiver->periodic_history.jitter_min = receiver->rr_stat.jitter;
660 receiver->periodic_history.jitter_max = receiver->rr_stat.jitter;
661 }
662
663 typedef enum {
664 RTP_SSRC_UPDATE,
665 RTP_SSRC_PROBATION,
666 RTP_SSRC_RESTART
667 } rtp_ssrc_result_e;
668
rtp_rx_ssrc_update(rtp_receiver_t * receiver,apr_uint32_t ssrc)669 static APR_INLINE rtp_ssrc_result_e rtp_rx_ssrc_update(rtp_receiver_t *receiver, apr_uint32_t ssrc)
670 {
671 if(receiver->rr_stat.ssrc == ssrc) {
672 /* known ssrc */
673 if(receiver->history.ssrc_probation) {
674 /* reset the probation for new ssrc */
675 receiver->history.ssrc_probation = 0;
676 receiver->history.ssrc_new = 0;
677 }
678 }
679 else {
680 if(receiver->history.ssrc_new == ssrc) {
681 if(--receiver->history.ssrc_probation == 0) {
682 /* restart with new ssrc */
683 receiver->rr_stat.ssrc = ssrc;
684 return RTP_SSRC_RESTART;
685 }
686 else {
687 return RTP_SSRC_PROBATION;
688 }
689 }
690 else {
691 /* start probation for new ssrc */
692 receiver->history.ssrc_new = ssrc;
693 receiver->history.ssrc_probation = 5;
694 return RTP_SSRC_PROBATION;
695 }
696 }
697 return RTP_SSRC_UPDATE;
698 }
699
700 typedef enum {
701 RTP_SEQ_UPDATE,
702 RTP_SEQ_MISORDER,
703 RTP_SEQ_DRIFT
704 } rtp_seq_result_e;
705
rtp_rx_seq_update(rtp_receiver_t * receiver,apr_uint16_t seq_num)706 static APR_INLINE rtp_seq_result_e rtp_rx_seq_update(rtp_receiver_t *receiver, apr_uint16_t seq_num)
707 {
708 rtp_seq_result_e result = RTP_SEQ_UPDATE;
709 apr_uint16_t seq_delta = seq_num - receiver->history.seq_num_max;
710 if(seq_delta < MAX_DROPOUT) {
711 if(seq_num < receiver->history.seq_num_max) {
712 /* sequence number wrapped */
713 receiver->history.seq_cycles += RTP_SEQ_MOD;
714 }
715 receiver->history.seq_num_max = seq_num;
716 }
717 else if(seq_delta <= RTP_SEQ_MOD - MAX_MISORDER) {
718 /* sequence number made a very large jump */
719 result = RTP_SEQ_DRIFT;
720 }
721 else {
722 /* duplicate or misordered packet */
723 result = RTP_SEQ_MISORDER;
724 }
725 receiver->stat.received_packets++;
726
727 return result;
728 }
729
730 typedef enum {
731 RTP_TS_UPDATE,
732 RTP_TS_DRIFT
733 } rtp_ts_result_e;
734
rtp_rx_ts_update(rtp_receiver_t * receiver,mpf_codec_descriptor_t * descriptor,apr_time_t * time,apr_uint32_t ts,apr_byte_t * marker)735 static APR_INLINE rtp_ts_result_e rtp_rx_ts_update(rtp_receiver_t *receiver, mpf_codec_descriptor_t *descriptor, apr_time_t *time, apr_uint32_t ts, apr_byte_t *marker)
736 {
737 apr_int32_t deviation;
738 apr_int32_t time_diff;
739
740 /* arrival time diff in msec */
741 time_diff = (apr_int32_t)apr_time_as_msec(*time - receiver->history.time_last);
742
743 /* if the time difference is more than the threshold (INTER_TALKSPURT_GAP),
744 and the marker is not set, then this might be a beginning of a
745 new malformed talkspurt */
746 if(!*marker && time_diff > INTER_TALKSPURT_GAP) {
747 /* set the missing marker */
748 *marker = 1;
749 }
750
751 /* arrival time diff in samples */
752 deviation = time_diff * descriptor->channel_count * descriptor->sampling_rate / 1000;
753 /* arrival timestamp diff */
754 deviation -= ts - receiver->history.ts_last;
755
756 if(deviation < 0) {
757 deviation = -deviation;
758 }
759
760 if(deviation > DEVIATION_THRESHOLD) {
761 return RTP_TS_DRIFT;
762 }
763
764 receiver->rr_stat.jitter += deviation - ((receiver->rr_stat.jitter + 8) >> 4);
765 RTP_TRACE("jitter=%u deviation=%d\n",receiver->rr_stat.jitter,deviation);
766 receiver->history.time_last = *time;
767 receiver->history.ts_last = ts;
768
769 if(receiver->rr_stat.jitter < receiver->periodic_history.jitter_min) {
770 receiver->periodic_history.jitter_min = receiver->rr_stat.jitter;
771 }
772 if(receiver->rr_stat.jitter > receiver->periodic_history.jitter_max) {
773 receiver->periodic_history.jitter_max = receiver->rr_stat.jitter;
774 }
775 return RTP_TS_UPDATE;
776 }
777
rtp_rx_failure_threshold_check(rtp_receiver_t * receiver)778 static APR_INLINE void rtp_rx_failure_threshold_check(rtp_receiver_t *receiver)
779 {
780 apr_uint32_t received;
781 apr_uint32_t discarded;
782 received = receiver->stat.received_packets - receiver->periodic_history.received_prior;
783 discarded = receiver->stat.discarded_packets - receiver->periodic_history.discarded_prior;
784
785 if(discarded * 100 > received * DISCARDED_TO_RECEIVED_RATIO_THRESHOLD) {
786 /* failure threshold reached -> restart */
787 rtp_rx_restart(receiver);
788 }
789 }
790
rtp_rx_packet_receive(mpf_rtp_stream_t * rtp_stream,void * buffer,apr_size_t size)791 static apt_bool_t rtp_rx_packet_receive(mpf_rtp_stream_t *rtp_stream, void *buffer, apr_size_t size)
792 {
793 rtp_receiver_t *receiver = &rtp_stream->receiver;
794 mpf_codec_descriptor_t *descriptor = rtp_stream->base->rx_descriptor;
795 apr_time_t time;
796 rtp_ssrc_result_e ssrc_result;
797 rtp_header_t *header = rtp_rx_header_skip(&buffer,&size);
798 if(!header) {
799 /* invalid RTP packet */
800 receiver->stat.invalid_packets++;
801 return FALSE;
802 }
803
804 header->sequence = ntohs((apr_uint16_t)header->sequence);
805 header->timestamp = ntohl(header->timestamp);
806 header->ssrc = ntohl(header->ssrc);
807
808 time = apr_time_now();
809
810 RTP_TRACE("RTP time=%6u ssrc=%8x pt=%3u %cts=%9u seq=%5u size=%"APR_SIZE_T_FMT"\n",
811 (apr_uint32_t)apr_time_usec(time),
812 header->ssrc, header->type, (header->marker == 1) ? '*' : ' ',
813 header->timestamp, header->sequence, size);
814 if(!receiver->stat.received_packets) {
815 /* initialization */
816 rtp_rx_stat_init(receiver,header,&time);
817 }
818
819 ssrc_result = rtp_rx_ssrc_update(receiver,header->ssrc);
820 if(ssrc_result == RTP_SSRC_PROBATION) {
821 receiver->stat.invalid_packets++;
822 return FALSE;
823 }
824 else if(ssrc_result == RTP_SSRC_RESTART) {
825 rtp_rx_restart(receiver);
826 rtp_rx_stat_init(receiver,header,&time);
827 }
828
829 rtp_rx_seq_update(receiver,(apr_uint16_t)header->sequence);
830
831 if(header->type == descriptor->payload_type) {
832 /* codec */
833 apr_byte_t marker = (apr_byte_t)header->marker;
834 if(rtp_rx_ts_update(receiver,descriptor,&time,header->timestamp,&marker) == RTP_TS_DRIFT) {
835 rtp_rx_restart(receiver);
836 return FALSE;
837 }
838
839 if(mpf_jitter_buffer_write(receiver->jb,buffer,size,header->timestamp,marker) != JB_OK) {
840 receiver->stat.discarded_packets++;
841 rtp_rx_failure_threshold_check(receiver);
842 }
843 }
844 else if(rtp_stream->base->rx_event_descriptor &&
845 header->type == rtp_stream->base->rx_event_descriptor->payload_type) {
846 /* named event */
847 mpf_named_event_frame_t *named_event = (mpf_named_event_frame_t *)buffer;
848 named_event->duration = ntohs((apr_uint16_t)named_event->duration);
849 if(mpf_jitter_buffer_event_write(receiver->jb,named_event,header->timestamp,(apr_byte_t)header->marker) != JB_OK) {
850 receiver->stat.discarded_packets++;
851 }
852 }
853 else if(header->type == RTP_PT_CN) {
854 /* CN packet */
855 receiver->stat.ignored_packets++;
856 }
857 else {
858 /* invalid payload type */
859 receiver->stat.ignored_packets++;
860 }
861
862 return TRUE;
863 }
864
rtp_rx_process(mpf_rtp_stream_t * rtp_stream)865 static apt_bool_t rtp_rx_process(mpf_rtp_stream_t *rtp_stream)
866 {
867 char buffer[MAX_RTP_PACKET_SIZE];
868 apr_size_t size = sizeof(buffer);
869 apr_size_t max_count = 5;
870 while(max_count && apr_socket_recv(rtp_stream->rtp_socket,buffer,&size) == APR_SUCCESS) {
871 rtp_rx_packet_receive(rtp_stream,buffer,size);
872
873 size = sizeof(buffer);
874 max_count--;
875 }
876 return TRUE;
877 }
878
mpf_rtp_stream_receive(mpf_audio_stream_t * stream,mpf_frame_t * frame)879 static apt_bool_t mpf_rtp_stream_receive(mpf_audio_stream_t *stream, mpf_frame_t *frame)
880 {
881 mpf_rtp_stream_t *rtp_stream = stream->obj;
882 rtp_rx_process(rtp_stream);
883
884 return mpf_jitter_buffer_read(rtp_stream->receiver.jb,frame);
885 }
886
887
mpf_rtp_tx_stream_open(mpf_audio_stream_t * stream,mpf_codec_t * codec)888 static apt_bool_t mpf_rtp_tx_stream_open(mpf_audio_stream_t *stream, mpf_codec_t *codec)
889 {
890 apr_size_t frame_size;
891 mpf_rtp_stream_t *rtp_stream = stream->obj;
892 rtp_transmitter_t *transmitter = &rtp_stream->transmitter;
893
894 if(!rtp_stream->rtp_socket || !rtp_stream->rtp_l_sockaddr || !rtp_stream->rtp_r_sockaddr) {
895 return FALSE;
896 }
897
898 if(!codec) {
899 return FALSE;
900 }
901
902 if(!transmitter->ptime) {
903 if(rtp_stream->settings && rtp_stream->settings->ptime) {
904 transmitter->ptime = rtp_stream->settings->ptime;
905 }
906 else {
907 transmitter->ptime = 20;
908 }
909 }
910 transmitter->packet_frames = transmitter->ptime / CODEC_FRAME_TIME_BASE;
911 transmitter->current_frames = 0;
912
913 frame_size = mpf_codec_frame_size_calculate(
914 stream->tx_descriptor,
915 codec->attribs);
916 transmitter->packet_data = apr_palloc(
917 rtp_stream->pool,
918 sizeof(rtp_header_t) + transmitter->packet_frames * frame_size);
919
920 transmitter->inactivity = 1;
921 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Open RTP Transmitter %s:%hu -> %s:%hu",
922 rtp_stream->rtp_l_sockaddr->hostname,
923 rtp_stream->rtp_l_sockaddr->port,
924 rtp_stream->rtp_r_sockaddr->hostname,
925 rtp_stream->rtp_r_sockaddr->port);
926 return TRUE;
927 }
928
mpf_rtp_tx_stream_close(mpf_audio_stream_t * stream)929 static apt_bool_t mpf_rtp_tx_stream_close(mpf_audio_stream_t *stream)
930 {
931 mpf_rtp_stream_t *rtp_stream = stream->obj;
932 if(!rtp_stream->rtp_l_sockaddr || !rtp_stream->rtp_r_sockaddr) {
933 return FALSE;
934 }
935 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Close RTP Transmitter %s:%hu -> %s:%hu [s:%u o:%u]",
936 rtp_stream->rtp_l_sockaddr->hostname,
937 rtp_stream->rtp_l_sockaddr->port,
938 rtp_stream->rtp_r_sockaddr->hostname,
939 rtp_stream->rtp_r_sockaddr->port,
940 rtp_stream->transmitter.sr_stat.sent_packets,
941 rtp_stream->transmitter.sr_stat.sent_octets);
942 return TRUE;
943 }
944
945
rtp_header_prepare(rtp_transmitter_t * transmitter,rtp_header_t * header,apr_byte_t payload_type,apr_byte_t marker,apr_uint32_t timestamp)946 static APR_INLINE void rtp_header_prepare(
947 rtp_transmitter_t *transmitter,
948 rtp_header_t *header,
949 apr_byte_t payload_type,
950 apr_byte_t marker,
951 apr_uint32_t timestamp)
952 {
953 header->version = RTP_VERSION;
954 header->padding = 0;
955 header->extension = 0;
956 header->count = 0;
957 header->marker = marker;
958 header->type = payload_type;
959 header->timestamp = timestamp;
960 header->ssrc = htonl(transmitter->sr_stat.ssrc);
961 }
962
mpf_rtp_data_send(mpf_rtp_stream_t * rtp_stream,rtp_transmitter_t * transmitter,const mpf_frame_t * frame)963 static APR_INLINE apt_bool_t mpf_rtp_data_send(mpf_rtp_stream_t *rtp_stream, rtp_transmitter_t *transmitter, const mpf_frame_t *frame)
964 {
965 apt_bool_t status = TRUE;
966 memcpy(
967 transmitter->packet_data + transmitter->packet_size,
968 frame->codec_frame.buffer,
969 frame->codec_frame.size);
970 transmitter->packet_size += frame->codec_frame.size;
971
972 if(++transmitter->current_frames == transmitter->packet_frames) {
973 rtp_header_t *header = (rtp_header_t*)transmitter->packet_data;
974 header->sequence = htons(++transmitter->last_seq_num);
975 RTP_TRACE("> RTP time=%6u ssrc=%8x pt=%3u %cts=%9u seq=%5hu\n",
976 (apr_uint32_t)apr_time_usec(apr_time_now()),
977 transmitter->sr_stat.ssrc, header->type,
978 (header->marker == 1) ? '*' : ' ',
979 header->timestamp, transmitter->last_seq_num);
980 header->timestamp = htonl(header->timestamp);
981 if(apr_socket_sendto(
982 rtp_stream->rtp_socket,
983 rtp_stream->rtp_r_sockaddr,
984 0,
985 transmitter->packet_data,
986 &transmitter->packet_size) == APR_SUCCESS) {
987 transmitter->sr_stat.sent_packets++;
988 transmitter->sr_stat.sent_octets += (apr_uint32_t)transmitter->packet_size - sizeof(rtp_header_t);
989 }
990 else {
991 status = FALSE;
992 }
993 transmitter->current_frames = 0;
994 }
995 return status;
996 }
997
mpf_rtp_event_send(mpf_rtp_stream_t * rtp_stream,rtp_transmitter_t * transmitter,const mpf_frame_t * frame)998 static APR_INLINE apt_bool_t mpf_rtp_event_send(mpf_rtp_stream_t *rtp_stream, rtp_transmitter_t *transmitter, const mpf_frame_t *frame)
999 {
1000 char packet_data[20];
1001 apr_size_t packet_size = sizeof(rtp_header_t) + sizeof(mpf_named_event_frame_t);
1002 rtp_header_t *header = (rtp_header_t*) packet_data;
1003 mpf_named_event_frame_t *named_event = (mpf_named_event_frame_t*)(header+1);
1004 rtp_header_prepare(
1005 transmitter,
1006 header,
1007 rtp_stream->base->tx_event_descriptor->payload_type,
1008 (frame->marker == MPF_MARKER_START_OF_EVENT) ? 1 : 0,
1009 transmitter->timestamp_base);
1010
1011 *named_event = frame->event_frame;
1012 named_event->edge = (frame->marker == MPF_MARKER_END_OF_EVENT) ? 1 : 0;
1013
1014 header->sequence = htons(++transmitter->last_seq_num);
1015 RTP_TRACE("> RTP time=%6u ssrc=%8x pt=%3u %cts=%9u seq=%hu event=%2u dur=%3u %c\n",
1016 (apr_uint32_t)apr_time_usec(apr_time_now()),
1017 transmitter->sr_stat.ssrc,
1018 header->type, (header->marker == 1) ? '*' : ' ',
1019 header->timestamp, transmitter->last_seq_num,
1020 named_event->event_id, named_event->duration,
1021 (named_event->edge == 1) ? '*' : ' ');
1022 header->timestamp = htonl(header->timestamp);
1023 named_event->duration = htons((apr_uint16_t)named_event->duration);
1024 if(apr_socket_sendto(
1025 rtp_stream->rtp_socket,
1026 rtp_stream->rtp_r_sockaddr,
1027 0,
1028 packet_data,
1029 &packet_size) != APR_SUCCESS) {
1030 return FALSE;
1031 }
1032 transmitter->sr_stat.sent_packets++;
1033 transmitter->sr_stat.sent_octets += sizeof(mpf_named_event_frame_t);
1034 return TRUE;
1035 }
1036
mpf_rtp_stream_transmit(mpf_audio_stream_t * stream,const mpf_frame_t * frame)1037 static apt_bool_t mpf_rtp_stream_transmit(mpf_audio_stream_t *stream, const mpf_frame_t *frame)
1038 {
1039 apt_bool_t status = TRUE;
1040 mpf_rtp_stream_t *rtp_stream = stream->obj;
1041 rtp_transmitter_t *transmitter = &rtp_stream->transmitter;
1042
1043 transmitter->timestamp += transmitter->samples_per_frame;
1044
1045 if(frame->type == MEDIA_FRAME_TYPE_NONE) {
1046 if(!transmitter->inactivity) {
1047 if(transmitter->current_frames == 0) {
1048 /* set inactivity (ptime alligned) */
1049 transmitter->inactivity = 1;
1050 if(rtp_stream->settings->rtcp == TRUE && rtp_stream->settings->rtcp_bye_policy == RTCP_BYE_PER_TALKSPURT) {
1051 apt_str_t reason = {RTCP_BYE_TALKSPURT_ENDED, sizeof(RTCP_BYE_TALKSPURT_ENDED)-1};
1052 mpf_rtcp_bye_send(rtp_stream,&reason);
1053 }
1054 }
1055 else {
1056 /* ptime allignment */
1057 status = mpf_rtp_data_send(rtp_stream,transmitter,frame);
1058 }
1059 }
1060 return status;
1061 }
1062
1063 if((frame->type & MEDIA_FRAME_TYPE_EVENT) == MEDIA_FRAME_TYPE_EVENT){
1064 /* transmit event as soon as received */
1065 if(stream->tx_event_descriptor) {
1066 if(frame->marker == MPF_MARKER_START_OF_EVENT) {
1067 /* store start time (base) of the event */
1068 transmitter->timestamp_base = transmitter->timestamp;
1069 }
1070 else if(frame->marker == MPF_MARKER_NEW_SEGMENT) {
1071 /* update base in case of long-lasting events */
1072 transmitter->timestamp_base = transmitter->timestamp;
1073 }
1074
1075 status = mpf_rtp_event_send(rtp_stream,transmitter,frame);
1076 }
1077 }
1078
1079 if((frame->type & MEDIA_FRAME_TYPE_AUDIO) == MEDIA_FRAME_TYPE_AUDIO){
1080 if(transmitter->current_frames == 0) {
1081 rtp_header_t *header = (rtp_header_t*)transmitter->packet_data;
1082 rtp_header_prepare(
1083 transmitter,
1084 header,
1085 stream->tx_descriptor->payload_type,
1086 transmitter->inactivity,
1087 transmitter->timestamp);
1088 transmitter->packet_size = sizeof(rtp_header_t);
1089 if(transmitter->inactivity) {
1090 transmitter->inactivity = 0;
1091 }
1092 }
1093 status = mpf_rtp_data_send(rtp_stream,transmitter,frame);
1094 }
1095
1096 return status;
1097 }
1098
mpf_socket_create(apr_pool_t * pool,apr_socket_t ** socket)1099 static apt_bool_t mpf_socket_create(apr_pool_t *pool, apr_socket_t **socket)
1100 {
1101 if(!socket)
1102 return FALSE;
1103
1104 if(apr_socket_create(socket,APR_INET,SOCK_DGRAM,0,pool) != APR_SUCCESS) {
1105 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Create Socket");
1106 *socket = NULL;
1107 return FALSE;
1108 }
1109
1110 apr_socket_opt_set(*socket,APR_SO_NONBLOCK,1);
1111 apr_socket_timeout_set(*socket,0);
1112 return TRUE;
1113 }
1114
mpf_socket_bind(apr_socket_t * socket,const char * ip,apr_port_t port,apr_pool_t * pool,apr_sockaddr_t ** l_sockaddr)1115 static apt_bool_t mpf_socket_bind(apr_socket_t *socket, const char *ip, apr_port_t port, apr_pool_t *pool, apr_sockaddr_t **l_sockaddr)
1116 {
1117 if(!socket || !l_sockaddr)
1118 return FALSE;
1119
1120 *l_sockaddr = NULL;
1121 apr_sockaddr_info_get(
1122 l_sockaddr,
1123 ip,
1124 APR_INET,
1125 port,
1126 0,
1127 pool);
1128 if(!*l_sockaddr) {
1129 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Get Sockaddr %s:%hu",ip,port);
1130 return FALSE;
1131 }
1132
1133 if(apr_socket_bind(socket,*l_sockaddr) != APR_SUCCESS) {
1134 apt_log(APT_LOG_MARK,APT_PRIO_DEBUG,"Failed to Bind Socket to %s:%hu", ip,port);
1135 return FALSE;
1136 }
1137 return TRUE;
1138 }
1139
1140 /* Create RTP/RTCP sockets */
mpf_rtp_socket_pair_create(mpf_rtp_stream_t * stream,mpf_rtp_media_descriptor_t * local_media,apt_bool_t bind)1141 static apt_bool_t mpf_rtp_socket_pair_create(mpf_rtp_stream_t *stream, mpf_rtp_media_descriptor_t *local_media, apt_bool_t bind)
1142 {
1143 /* Create and optionally bind RTP socket. Return FALSE in case of an error. */
1144 if(mpf_socket_create(stream->pool,&stream->rtp_socket) == FALSE) {
1145 return FALSE;
1146 }
1147 if(bind == TRUE) {
1148 if(mpf_socket_bind(stream->rtp_socket,local_media->ip.buf,local_media->port,stream->pool,&stream->rtp_l_sockaddr) == FALSE) {
1149 apr_socket_close(stream->rtp_socket);
1150 stream->rtp_socket = NULL;
1151 return FALSE;
1152 }
1153 }
1154
1155 /* Create and optionally bind RCTP socket. Continue in either way. */
1156 if(mpf_socket_create(stream->pool,&stream->rtcp_socket) == TRUE && bind == TRUE) {
1157 if(mpf_socket_bind(stream->rtcp_socket,local_media->ip.buf,local_media->port+1,stream->pool,&stream->rtcp_l_sockaddr) == FALSE) {
1158 apr_socket_close(stream->rtcp_socket);
1159 stream->rtcp_socket = NULL;
1160 }
1161 }
1162 return TRUE;
1163 }
1164
1165 /* Bind RTP/RTCP sockets */
mpf_rtp_socket_pair_bind(mpf_rtp_stream_t * stream,mpf_rtp_media_descriptor_t * local_media)1166 static apt_bool_t mpf_rtp_socket_pair_bind(mpf_rtp_stream_t *stream, mpf_rtp_media_descriptor_t *local_media)
1167 {
1168 /* Bind RTP socket. Return FALSE in case of an error. */
1169 if(mpf_socket_bind(stream->rtp_socket,local_media->ip.buf,local_media->port,stream->pool,&stream->rtp_l_sockaddr) == FALSE) {
1170 return FALSE;
1171 }
1172
1173 /* Try to bind RTCP socket. Continue in either way. */
1174 mpf_socket_bind(stream->rtcp_socket,local_media->ip.buf,local_media->port+1,stream->pool,&stream->rtcp_l_sockaddr);
1175 return TRUE;
1176 }
1177
1178 /* Close RTP/RTCP sockets */
mpf_rtp_socket_pair_close(mpf_rtp_stream_t * stream)1179 static void mpf_rtp_socket_pair_close(mpf_rtp_stream_t *stream)
1180 {
1181 if(stream->rtp_socket) {
1182 apr_socket_close(stream->rtp_socket);
1183 stream->rtp_socket = NULL;
1184 }
1185 if(stream->rtcp_socket) {
1186 apr_socket_close(stream->rtcp_socket);
1187 stream->rtcp_socket = NULL;
1188 }
1189 }
1190
1191
1192
rtcp_sr_generate(mpf_rtp_stream_t * rtp_stream,rtcp_sr_stat_t * sr_stat)1193 static APR_INLINE void rtcp_sr_generate(mpf_rtp_stream_t *rtp_stream, rtcp_sr_stat_t *sr_stat)
1194 {
1195 *sr_stat = rtp_stream->transmitter.sr_stat;
1196 apt_ntp_time_get(&sr_stat->ntp_sec, &sr_stat->ntp_frac);
1197 sr_stat->rtp_ts = rtp_stream->transmitter.timestamp;
1198
1199 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Generate RTCP SR [ssrc:%u s:%u o:%u ts:%u]",
1200 sr_stat->ssrc,
1201 sr_stat->sent_packets,
1202 sr_stat->sent_octets,
1203 sr_stat->rtp_ts);
1204 rtcp_sr_hton(sr_stat);
1205 }
1206
rtcp_rr_generate(mpf_rtp_stream_t * rtp_stream,rtcp_rr_stat_t * rr_stat)1207 static APR_INLINE void rtcp_rr_generate(mpf_rtp_stream_t *rtp_stream, rtcp_rr_stat_t *rr_stat)
1208 {
1209 *rr_stat = rtp_stream->receiver.rr_stat;
1210 rr_stat->last_seq = rtp_stream->receiver.history.seq_num_max;
1211
1212 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Generate RTCP RR [ssrc:%u last_seq:%u j:%u lost:%u frac:%d]",
1213 rr_stat->ssrc,
1214 rr_stat->last_seq,
1215 rr_stat->jitter,
1216 rr_stat->lost,
1217 rr_stat->fraction);
1218 rtcp_rr_hton(rr_stat);
1219 }
1220
1221 /* Generate either RTCP SR or RTCP RR packet */
rtcp_report_generate(mpf_rtp_stream_t * rtp_stream,rtcp_packet_t * rtcp_packet,apr_size_t length)1222 static APR_INLINE apr_size_t rtcp_report_generate(mpf_rtp_stream_t *rtp_stream, rtcp_packet_t *rtcp_packet, apr_size_t length)
1223 {
1224 apr_size_t offset = 0;
1225 rtcp_header_init(&rtcp_packet->header,RTCP_RR);
1226 if(rtp_stream->base->direction & STREAM_DIRECTION_SEND) {
1227 rtcp_packet->header.pt = RTCP_SR;
1228 }
1229 if(rtp_stream->base->direction & STREAM_DIRECTION_RECEIVE) {
1230 rtcp_packet->header.count = 1;
1231 }
1232 offset += sizeof(rtcp_header_t);
1233
1234 if(rtcp_packet->header.pt == RTCP_SR) {
1235 rtcp_sr_generate(rtp_stream,&rtcp_packet->r.sr.sr_stat);
1236 offset += sizeof(rtcp_sr_stat_t);
1237 if(rtcp_packet->header.count) {
1238 rtcp_rr_generate(rtp_stream,rtcp_packet->r.sr.rr_stat);
1239 offset += sizeof(rtcp_rr_stat_t);
1240 }
1241 }
1242 else if(rtcp_packet->header.pt == RTCP_RR) {
1243 rtcp_packet->r.rr.ssrc = htonl(rtp_stream->transmitter.sr_stat.ssrc);
1244 rtcp_rr_generate(rtp_stream,rtcp_packet->r.rr.rr_stat);
1245 offset += sizeof(rtcp_packet->r.rr);
1246 }
1247 rtcp_header_length_set(&rtcp_packet->header,offset);
1248 return offset;
1249 }
1250
1251 /* Generate RTCP SDES packet */
rtcp_sdes_generate(mpf_rtp_stream_t * rtp_stream,rtcp_packet_t * rtcp_packet,apr_size_t length)1252 static APR_INLINE apr_size_t rtcp_sdes_generate(mpf_rtp_stream_t *rtp_stream, rtcp_packet_t *rtcp_packet, apr_size_t length)
1253 {
1254 rtcp_sdes_item_t *item;
1255 apr_size_t offset = 0;
1256 apr_size_t padding;
1257 rtcp_header_init(&rtcp_packet->header,RTCP_SDES);
1258 offset += sizeof(rtcp_header_t);
1259
1260 rtcp_packet->header.count ++;
1261 rtcp_packet->r.sdes.ssrc = htonl(rtp_stream->transmitter.sr_stat.ssrc);
1262 offset += sizeof(apr_uint32_t);
1263
1264 /* insert SDES CNAME item */
1265 item = &rtcp_packet->r.sdes.item[0];
1266 item->type = RTCP_SDES_CNAME;
1267 item->length = (apr_byte_t)rtp_stream->local_media->ip.length;
1268 memcpy(item->data,rtp_stream->local_media->ip.buf,item->length);
1269 offset += sizeof(rtcp_sdes_item_t) - 1 + item->length;
1270
1271 /* terminate with end marker and pad to next 4-octet boundary */
1272 padding = 4 - (offset & 0x3);
1273 while(padding--) {
1274 item = (rtcp_sdes_item_t*) ((char*)rtcp_packet + offset);
1275 item->type = RTCP_SDES_END;
1276 offset++;
1277 }
1278
1279 rtcp_header_length_set(&rtcp_packet->header,offset);
1280 return offset;
1281 }
1282
1283 /* Generate RTCP BYE packet */
rtcp_bye_generate(mpf_rtp_stream_t * rtp_stream,rtcp_packet_t * rtcp_packet,apr_size_t length,apt_str_t * reason)1284 static APR_INLINE apr_size_t rtcp_bye_generate(mpf_rtp_stream_t *rtp_stream, rtcp_packet_t *rtcp_packet, apr_size_t length, apt_str_t *reason)
1285 {
1286 apr_size_t offset = 0;
1287 rtcp_header_init(&rtcp_packet->header,RTCP_BYE);
1288 offset += sizeof(rtcp_header_t);
1289
1290 rtcp_packet->r.bye.ssrc[0] = htonl(rtp_stream->transmitter.sr_stat.ssrc);
1291 rtcp_packet->header.count++;
1292 offset += rtcp_packet->header.count * sizeof(apr_uint32_t);
1293
1294 if(reason->length) {
1295 apr_size_t padding;
1296
1297 memcpy(rtcp_packet->r.bye.data,reason->buf,reason->length);
1298 rtcp_packet->r.bye.length = (apr_byte_t)reason->length;
1299 offset += rtcp_packet->r.bye.length;
1300
1301 /* terminate with end marker and pad to next 4-octet boundary */
1302 padding = 4 - (reason->length & 0x3);
1303 if(padding) {
1304 char *end = rtcp_packet->r.bye.data + reason->length;
1305 memset(end,0,padding);
1306 offset += padding;
1307 }
1308 }
1309
1310 rtcp_header_length_set(&rtcp_packet->header,offset);
1311 return offset;
1312 }
1313
1314 /* Send compound RTCP packet (SR/RR + SDES) */
mpf_rtcp_report_send(mpf_rtp_stream_t * rtp_stream)1315 static apt_bool_t mpf_rtcp_report_send(mpf_rtp_stream_t *rtp_stream)
1316 {
1317 char buffer[MAX_RTCP_PACKET_SIZE];
1318 apr_size_t length = 0;
1319 rtcp_packet_t *rtcp_packet;
1320
1321 if(!rtp_stream->rtcp_socket || !rtp_stream->rtcp_l_sockaddr || !rtp_stream->rtcp_r_sockaddr) {
1322 /* session is not initialized */
1323 return FALSE;
1324 }
1325
1326 if(rtp_stream->base->direction != STREAM_DIRECTION_NONE) {
1327 /* update periodic (prior) history */
1328 rtp_periodic_history_update(&rtp_stream->receiver);
1329 }
1330
1331 rtcp_packet = (rtcp_packet_t*) (buffer + length);
1332 length += rtcp_report_generate(rtp_stream,rtcp_packet,sizeof(buffer)-length);
1333
1334 rtcp_packet = (rtcp_packet_t*) (buffer + length);
1335 length += rtcp_sdes_generate(rtp_stream,rtcp_packet,sizeof(buffer)-length);
1336
1337 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send Compound RTCP Packet [%"APR_SIZE_T_FMT" bytes] %s:%hu -> %s:%hu",
1338 length,
1339 rtp_stream->rtcp_l_sockaddr->hostname,
1340 rtp_stream->rtcp_l_sockaddr->port,
1341 rtp_stream->rtcp_r_sockaddr->hostname,
1342 rtp_stream->rtcp_r_sockaddr->port);
1343 if(apr_socket_sendto(
1344 rtp_stream->rtcp_socket,
1345 rtp_stream->rtcp_r_sockaddr,
1346 0,
1347 buffer,
1348 &length) != APR_SUCCESS) {
1349 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send Compound RTCP Packet [%"APR_SIZE_T_FMT" bytes] %s:%hu -> %s:%hu",
1350 length,
1351 rtp_stream->rtcp_l_sockaddr->hostname,
1352 rtp_stream->rtcp_l_sockaddr->port,
1353 rtp_stream->rtcp_r_sockaddr->hostname,
1354 rtp_stream->rtcp_r_sockaddr->port);
1355 return FALSE;
1356 }
1357 return TRUE;
1358 }
1359
1360 /* Send compound RTCP packet (SR/RR + SDES + BYE) */
mpf_rtcp_bye_send(mpf_rtp_stream_t * rtp_stream,apt_str_t * reason)1361 static apt_bool_t mpf_rtcp_bye_send(mpf_rtp_stream_t *rtp_stream, apt_str_t *reason)
1362 {
1363 char buffer[MAX_RTCP_PACKET_SIZE];
1364 apr_size_t length = 0;
1365 rtcp_packet_t *rtcp_packet;
1366
1367 if(!rtp_stream->rtcp_socket || !rtp_stream->rtcp_l_sockaddr || !rtp_stream->rtcp_r_sockaddr) {
1368 /* session is not initialized */
1369 return FALSE;
1370 }
1371
1372 if(rtp_stream->base->direction != STREAM_DIRECTION_NONE) {
1373 /* update periodic (prior) history */
1374 rtp_periodic_history_update(&rtp_stream->receiver);
1375 }
1376
1377 rtcp_packet = (rtcp_packet_t*) (buffer + length);
1378 length += rtcp_report_generate(rtp_stream,rtcp_packet,sizeof(buffer)-length);
1379
1380 rtcp_packet = (rtcp_packet_t*) (buffer + length);
1381 length += rtcp_sdes_generate(rtp_stream,rtcp_packet,sizeof(buffer)-length);
1382
1383 rtcp_packet = (rtcp_packet_t*) (buffer + length);
1384 length += rtcp_bye_generate(rtp_stream,rtcp_packet,sizeof(buffer)-length,reason);
1385
1386 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Send Compound RTCP Packet [BYE] [%"APR_SIZE_T_FMT" bytes] %s:%hu -> %s:%hu",
1387 length,
1388 rtp_stream->rtcp_l_sockaddr->hostname,
1389 rtp_stream->rtcp_l_sockaddr->port,
1390 rtp_stream->rtcp_r_sockaddr->hostname,
1391 rtp_stream->rtcp_r_sockaddr->port);
1392 if(apr_socket_sendto(
1393 rtp_stream->rtcp_socket,
1394 rtp_stream->rtcp_r_sockaddr,
1395 0,
1396 buffer,
1397 &length) != APR_SUCCESS) {
1398 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Failed to Send Compound RTCP Packet [BYE] [%"APR_SIZE_T_FMT" bytes] %s:%hu -> %s:%hu",
1399 length,
1400 rtp_stream->rtcp_l_sockaddr->hostname,
1401 rtp_stream->rtcp_l_sockaddr->port,
1402 rtp_stream->rtcp_r_sockaddr->hostname,
1403 rtp_stream->rtcp_r_sockaddr->port);
1404 return FALSE;
1405 }
1406 return TRUE;
1407 }
1408
rtcp_sr_get(mpf_rtp_stream_t * rtp_stream,rtcp_sr_stat_t * sr_stat)1409 static APR_INLINE void rtcp_sr_get(mpf_rtp_stream_t *rtp_stream, rtcp_sr_stat_t *sr_stat)
1410 {
1411 rtcp_sr_ntoh(sr_stat);
1412 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Get RTCP SR [ssrc:%u s:%u o:%u ts:%u]",
1413 sr_stat->ssrc,
1414 sr_stat->sent_packets,
1415 sr_stat->sent_octets,
1416 sr_stat->rtp_ts);
1417 }
1418
rtcp_rr_get(mpf_rtp_stream_t * rtp_stream,rtcp_rr_stat_t * rr_stat)1419 static APR_INLINE void rtcp_rr_get(mpf_rtp_stream_t *rtp_stream, rtcp_rr_stat_t *rr_stat)
1420 {
1421 rtcp_rr_ntoh(rr_stat);
1422 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Get RTCP RR [ssrc:%u last_seq:%u j:%u lost:%u frac:%d]",
1423 rr_stat->ssrc,
1424 rr_stat->last_seq,
1425 rr_stat->jitter,
1426 rr_stat->lost,
1427 rr_stat->fraction);
1428 }
1429
mpf_rtcp_compound_packet_receive(mpf_rtp_stream_t * rtp_stream,char * buffer,apr_size_t length)1430 static apt_bool_t mpf_rtcp_compound_packet_receive(mpf_rtp_stream_t *rtp_stream, char *buffer, apr_size_t length)
1431 {
1432 rtcp_packet_t *rtcp_packet = (rtcp_packet_t*) buffer;
1433 rtcp_packet_t *rtcp_packet_end;
1434
1435 rtcp_packet_end = (rtcp_packet_t*)(buffer + length);
1436
1437 while(rtcp_packet < rtcp_packet_end && rtcp_packet->header.version == RTP_VERSION) {
1438 rtcp_packet->header.length = ntohs((apr_uint16_t)rtcp_packet->header.length);
1439
1440 if(rtcp_packet->header.pt == RTCP_SR) {
1441 /* RTCP SR */
1442 rtcp_sr_get(rtp_stream,&rtcp_packet->r.sr.sr_stat);
1443 if(rtcp_packet->header.count) {
1444 rtcp_rr_get(rtp_stream,rtcp_packet->r.sr.rr_stat);
1445 }
1446 }
1447 else if(rtcp_packet->header.pt == RTCP_RR) {
1448 /* RTCP RR */
1449 rtcp_packet->r.rr.ssrc = ntohl(rtcp_packet->r.rr.ssrc);
1450 if(rtcp_packet->header.count) {
1451 rtcp_rr_get(rtp_stream,rtcp_packet->r.rr.rr_stat);
1452 }
1453 }
1454 else if(rtcp_packet->header.pt == RTCP_SDES) {
1455 /* RTCP SDES */
1456 }
1457 else if(rtcp_packet->header.pt == RTCP_BYE) {
1458 /* RTCP BYE */
1459 }
1460 else {
1461 /* unknown RTCP packet */
1462 }
1463
1464 /* get next RTCP packet */
1465 rtcp_packet = (rtcp_packet_t*)((apr_uint32_t*)rtcp_packet + rtcp_packet->header.length + 1);
1466 }
1467
1468 if(rtcp_packet != rtcp_packet_end) {
1469 apt_log(APT_LOG_MARK,APT_PRIO_WARNING,"Malformed Compound RTCP Packet");
1470 return FALSE;
1471 }
1472
1473 return TRUE;
1474 }
1475
mpf_rtcp_tx_timer_proc(apt_timer_t * timer,void * obj)1476 static void mpf_rtcp_tx_timer_proc(apt_timer_t *timer, void *obj)
1477 {
1478 mpf_rtp_stream_t *rtp_stream = obj;
1479
1480 /* generate and send RTCP compound report (SR/RR + SDES) */
1481 mpf_rtcp_report_send(rtp_stream);
1482
1483 /* re-schedule timer */
1484 apt_timer_set(timer,rtp_stream->settings->rtcp_tx_interval);
1485 }
1486
mpf_rtcp_rx_timer_proc(apt_timer_t * timer,void * obj)1487 static void mpf_rtcp_rx_timer_proc(apt_timer_t *timer, void *obj)
1488 {
1489 mpf_rtp_stream_t *rtp_stream = obj;
1490 if(rtp_stream->rtcp_socket && rtp_stream->rtcp_l_sockaddr && rtp_stream->rtcp_r_sockaddr) {
1491 char buffer[MAX_RTCP_PACKET_SIZE];
1492 apr_size_t length = sizeof(buffer);
1493
1494 if(apr_socket_recv(rtp_stream->rtcp_socket,buffer,&length) == APR_SUCCESS) {
1495 apt_log(APT_LOG_MARK,APT_PRIO_INFO,"Receive Compound RTCP Packet [%"APR_SIZE_T_FMT" bytes] %s:%hu <- %s:%hu",
1496 length,
1497 rtp_stream->rtcp_l_sockaddr->hostname,
1498 rtp_stream->rtcp_l_sockaddr->port,
1499 rtp_stream->rtcp_r_sockaddr->hostname,
1500 rtp_stream->rtcp_r_sockaddr->port);
1501 mpf_rtcp_compound_packet_receive(rtp_stream,buffer,length);
1502 }
1503 }
1504
1505 /* re-schedule timer */
1506 apt_timer_set(timer,rtp_stream->settings->rtcp_rx_resolution);
1507 }
1508