1 /*
2 * Copyright (C) 2002-2003 Fhg Fokus
3 *
4 * This file is part of SEMS, a free SIP media server.
5 *
6 * SEMS is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 2 of the License, or
9 * (at your option) any later version. This program is released under
10 * the GPL with the additional exemption that compiling, linking,
11 * and/or using OpenSSL is allowed.
12 *
13 * For a license to use the SEMS software under conditions
14 * other than those described here, or to purchase support for this
15 * software, please contact iptel.org by e-mail at the following addresses:
16 * info@iptel.org
17 *
18 * SEMS is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
22 *
23 * You should have received a copy of the GNU General Public License
24 * along with this program; if not, write to the Free Software
25 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
26 */
27
28 #include "AmRtpStream.h"
29 #include "AmRtpPacket.h"
30 #include "AmRtpReceiver.h"
31 #include "AmConfig.h"
32 #include "AmPlugIn.h"
33 #include "AmAudio.h"
34 #include "AmUtils.h"
35 #include "AmSession.h"
36 #include "AmRtpMuxStream.h"
37
38 #include "AmDtmfDetector.h"
39 #include "rtp/telephone_event.h"
40 #include "amci/codecs.h"
41 #include "AmJitterBuffer.h"
42
43 #include "sip/resolver.h"
44 #include "sip/ip_util.h"
45 #include "sip/raw_sender.h"
46 #include "sip/msg_logger.h"
47
48 #include "log.h"
49
50 #include <assert.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <sys/time.h>
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include <sys/ioctl.h>
57 #include <arpa/inet.h>
58 #include <netinet/in.h>
59
StreamInfoStreamInfo60 #ifdef WITH_ZRTP
61 #include "libzrtp/zrtp.h"
62 #endif
63
64 #include "rtp/rtp.h"
65
66 #include <set>
67 using std::set;
68
69 void PayloadMask::clear()
70 {
71 memset(bits, 0, sizeof(bits));
72 }
73
74 void PayloadMask::set_all()
RtpPacketRtpPacket75 {
76 memset(bits, 0xFF, sizeof(bits));
77 }
78
79 void PayloadMask::invert()
80 {
81 // assumes that bits[] contains 128 bits
~RtpPacketRtpPacket82 unsigned long long* ull = (unsigned long long*)bits;
83 ull[0] = ~ull[0];
84 ull[1] = ~ull[1];
85 }
86
87 PayloadMask::PayloadMask(const PayloadMask &src)
88 {
89 memcpy(bits, src.bits, sizeof(bits));
90 }
91
92 //////////////////////////////////////////////////////////////////////////////////////////////////////////
93
94 /*
95 * This function must be called before setLocalPort, because
96 * setLocalPort will bind the socket and it will be not
97 * possible to change the IP later
98 */
99 void AmRtpStream::setLocalIP(const string& ip)
100 {
101 if (!am_inet_pton(ip.c_str(), &l_saddr)) {
102 throw string ("AmRtpStream::setLocalIP: Invalid IP address: ") + ip;
103 }
104 DBG("ip = %s\n",ip.c_str());
105 }
106
107 int AmRtpStream::hasLocalSocket() {
108 return l_sd;
109 }
110
111 int AmRtpStream::getLocalSocket()
112 {
113 if (l_sd)
114 return l_sd;
115
116 int sd=0, rtcp_sd=0;
117 if((sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) {
118 ERROR("%s\n",strerror(errno));
119 throw string ("while creating new socket.");
120 }
121
122 if((rtcp_sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) {
123 ERROR("%s\n",strerror(errno));
124 throw string ("while creating new socket.");
125 }
126
127 int true_opt = 1;
128 if(ioctl(sd, FIONBIO , &true_opt) == -1){
129 ERROR("%s\n",strerror(errno));
130 close(sd);
131 throw string ("while setting socket non blocking.");
132 }
133
134 if(ioctl(rtcp_sd, FIONBIO , &true_opt) == -1){
135 ERROR("%s\n",strerror(errno));
136 close(sd);
137 throw string ("while setting socket non blocking.");
138 }
139
140 l_sd = sd;
141 l_rtcp_sd = rtcp_sd;
142
143 return l_sd;
144 }
145
146 void AmRtpStream::setLocalPort(unsigned short p)
147 {
148 if(l_port)
149 return;
150
151 if(l_if < 0) {
152 if (session) l_if = session->getRtpInterface();
153 else {
154 ERROR("BUG: no session when initializing RTP stream, invalid interface can be used\n");
155 l_if = 0;
156 }
157 }
158
159 int retry = 10;
160 unsigned short port = 0;
161 for(;retry; --retry){
162
163 if (!getLocalSocket())
164 return;
165
166 if(!p)
167 port = AmConfig::RTP_Ifs[l_if].getNextRtpPort();
168 else
169 port = p;
170
171 am_set_port(&l_saddr,port+1);
172 if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) {
173 DBG("bind: %s\n",strerror(errno));
174 goto try_another_port;
175 }
176
177 am_set_port(&l_saddr,port);
178 if(bind(l_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) {
179 DBG("bind: %s\n",strerror(errno));
180 goto try_another_port;
181 }
182
183 // both bind() succeeded!
184 break;
185
186 try_another_port:
187 close(l_sd);
188 l_sd = 0;
189 close(l_rtcp_sd);
190 l_rtcp_sd = 0;
191 }
192
193 int true_opt = 1;
194 if (!retry){
195 ERROR("could not find a free RTP port\n");
196 throw string("could not find a free RTP port");
197 }
198
199 // rco: does that make sense after bind() ????
200 if(setsockopt(l_sd, SOL_SOCKET, SO_REUSEADDR,
201 (void*)&true_opt, sizeof (true_opt)) == -1) {
202
203 ERROR("%s\n",strerror(errno));
204 close(l_sd);
205 l_sd = 0;
206 throw string ("while setting local address reusable.");
207 }
208
209 l_port = port;
210 l_rtcp_port = port+1;
211
212 if(!p) {
213 AmRtpReceiver::instance()->addStream(l_sd, this);
214 AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
215 DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this,
216 get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port);
217 }
218
219 memcpy(&l_rtcp_saddr, &l_saddr, sizeof(l_saddr));
220 am_set_port(&l_rtcp_saddr, l_rtcp_port);
221 }
222
223 int AmRtpStream::ping()
224 {
225 // TODO:
226 // - we'd better send an empty UDP packet
227 // for this purpose.
228
229 unsigned char ping_chr[2];
230
231 ping_chr[0] = 0;
232 ping_chr[1] = 0;
233
234 AmRtpPacket rp;
235 rp.payload = payload;
236 rp.marker = true;
237 rp.sequence = sequence++;
238 rp.timestamp = 0;
239 rp.ssrc = l_ssrc;
240 rp.compile((unsigned char*)ping_chr,2);
241
242 rp.setAddr(&r_saddr);
243 if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx,&l_saddr) < 0){
244 ERROR("while sending RTP packet.\n");
245 return -1;
246 }
247
248 return 2;
249 }
250
251 int AmRtpStream::compile_and_send(const int payload, bool marker, unsigned int ts,
252 unsigned char* buffer, unsigned int size) {
253 AmRtpPacket rp;
254 rp.payload = payload;
255 rp.timestamp = ts;
256 rp.marker = marker;
257 rp.sequence = sequence++;
258 rp.ssrc = l_ssrc;
259 rp.compile((unsigned char*)buffer,size);
260
261 rp.setAddr(&r_saddr);
262
263 #ifdef WITH_ZRTP
264 if (session && session->enable_zrtp){
265 if (NULL == session->zrtp_session_state.zrtp_audio) {
266 ERROR("ZRTP enabled on session, but no audio stream created\n");
267 return -1;
268 }
269
270 unsigned int size = rp.getBufferSize();
271 zrtp_status_t status = zrtp_process_rtp(session->zrtp_session_state.zrtp_audio,
272 (char*)rp.getBuffer(), &size);
273 switch (status) {
274 case zrtp_status_drop: {
275 DBG("ZRTP says: drop packet! %u - %u\n", size, rp.getBufferSize());
276 return 0;
277 }
278 case zrtp_status_ok: {
279 // DBG("ZRTP says: ok!\n");
280 if (rp.getBufferSize() != size)
281 // DBG("SEND packet size before: %d, after %d\n",
282 // rp.getBufferSize(), size);
283 rp.setBufferSize(size);
284 } break;
285 default:
286 case zrtp_status_fail: {
287 DBG("ZRTP says: fail!\n");
288 // DBG("(f)");
289 return 0;
290 }
291
292 }
293
294 }
295 #endif
296
297 if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
298 ERROR("while sending RTP packet.\n");
299 return -1;
300 }
301
302 if (logger) rp.logSent(logger, &l_saddr);
303
304 return size;
305 }
306
307 int AmRtpStream::send( unsigned int ts, unsigned char* buffer, unsigned int size )
308 {
309 if ((mute) || (hold))
310 return 0;
311
312 if(remote_telephone_event_pt.get())
313 dtmf_sender.sendPacket(ts,remote_telephone_event_pt->payload_type,this);
314
315 if(!size)
316 return -1;
317
318 PayloadMappingTable::iterator it = pl_map.find(payload);
319 if ((it == pl_map.end()) || (it->second.remote_pt < 0)) {
320 ERROR("sending packet with unsupported remote payload type %d\n", payload);
321 return -1;
322 }
323
324 return compile_and_send(it->second.remote_pt, false, ts, buffer, size);
325 }
326
327 int AmRtpStream::send_raw( char* packet, unsigned int length )
328 {
329 if ((mute) || (hold))
330 return 0;
331
332 AmRtpPacket rp;
333 rp.compile_raw((unsigned char*)packet, length);
334 rp.setAddr(&r_saddr);
335
336 if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
337 ERROR("while sending raw RTP packet.\n");
338 return -1;
339 }
340
341 if (logger) rp.logSent(logger, &l_saddr);
342
343 return length;
344 }
345
346 // returns
347 // @param ts [out] timestamp of the received packet,
348 // in audio buffer relative time
349 // @param audio_buffer_ts [in] current ts at the audio_buffer
350
351 int AmRtpStream::receive( unsigned char* buffer, unsigned int size,
352 unsigned int& ts, int &out_payload)
353 {
354 AmRtpPacket* rp = NULL;
355 int err = nextPacket(rp);
356
357 if(err <= 0)
358 return err;
359
360 if (!rp)
361 return 0;
362
363 handleSymmetricRtp(&rp->addr,false);
364
365 /* do we have a new talk spurt? */
366 begin_talk = ((last_payload == 13) || rp->marker);
367 last_payload = rp->payload;
368
369 if(!rp->getDataSize()) {
370 mem.freePacket(rp);
371 return RTP_EMPTY;
372 }
373
374 if (rp->payload == getLocalTelephoneEventPT())
375 {
376 recvDtmfPacket(rp);
377 mem.freePacket(rp);
378 return RTP_DTMF;
379 }
380
381 assert(rp->getData());
382 if(rp->getDataSize() > size){
383 ERROR("received too big RTP packet\n");
384 mem.freePacket(rp);
385 return RTP_BUFFER_SIZE;
386 }
387
388 memcpy(buffer,rp->getData(),rp->getDataSize());
389 ts = rp->timestamp;
390 out_payload = rp->payload;
391
392 int res = rp->getDataSize();
393 mem.freePacket(rp);
394 return res;
395 }
396
397 AmRtpStream::AmRtpStream(AmSession* _s, int _if)
398 : sdp_media_index(-1),
399 r_port(0),
400 l_if(_if),
401 l_port(0),
402 l_sd(0),
403 r_ssrc_i(false),
404 passive(false),
405 passive_rtcp(false),
406 hold(false),
407 remotehold(false),
408 monitor_rtp_timeout(true),
409 receiving(true),
410 relay_enabled(false),
411 relay_raw(false),
412 relay_stream(NULL),
413 relay_transparent_seqno(true),
414 relay_transparent_ssrc(true),
415 relay_filter_dtmf(false),
416 session(_s),
417 logger(NULL),
418 offer_answer_used(true),
419 active(false), // do not return any data unless something really received
420 mute(false),
421 force_receive_dtmf(false)
422 {
423
424 memset(&r_saddr,0,sizeof(struct sockaddr_storage));
425 memset(&l_saddr,0,sizeof(struct sockaddr_storage));
426
427 l_ssrc = get_random();
428 sequence = get_random();
429 clearRTPTimeout();
430
431 // by default the system codecs
432 payload_provider = AmPlugIn::instance();
433 }
434
435 AmRtpStream::~AmRtpStream()
436 {
437 if (!rtp_mux_remote_ip.empty()) {
438 DBG("RTP MUX: closing on RTP stream instance [%p]\n", this);
439 AmRtpMuxSender::instance()->close(rtp_mux_remote_ip, rtp_mux_remote_port, getRPort());
440 }
441
442 if(l_sd){
443 if (AmRtpReceiver::haveInstance()){
444 AmRtpReceiver::instance()->removeStream(l_sd, getLocalPort());
445 AmRtpReceiver::instance()->removeStream(l_rtcp_sd, getLocalRtcpPort());
446 }
447 close(l_sd);
448 close(l_rtcp_sd);
449 }
450 if (logger) dec_ref(logger);
451 }
452
453 int AmRtpStream::getLocalPort()
454 {
455 // if (hold)
456 // return 0;
457
458 if(!l_port)
459 setLocalPort();
460
461 return l_port;
462 }
463
464 int AmRtpStream::getLocalRtcpPort()
465 {
466 if(!l_rtcp_port)
467 setLocalPort();
468
469 return l_rtcp_port;
470 }
471
472 int AmRtpStream::getRPort()
473 {
474 return r_port;
475 }
476
477 string AmRtpStream::getRHost()
478 {
479 return r_host;
480 }
481
482 void AmRtpStream::setRAddr(const string& addr, unsigned short port,
483 unsigned short rtcp_port)
484 {
485 DBG("RTP remote address set to %s:(%u/%u)\n",
486 addr.c_str(),port,rtcp_port);
487
488 struct sockaddr_storage ss;
489 memset (&ss, 0, sizeof (ss));
490
491 /* inet_aton only supports dot-notation IP address strings... but an RFC
492 * 4566 unicast-address, as found in c=, can be an FQDN (or other!).
493 */
494 dns_handle dh;
495 if (resolver::instance()->resolve_name(addr.c_str(),&dh,&ss,IPv4) < 0) {
496 WARN("Address not valid (host: %s).\n", addr.c_str());
497 throw string("invalid address") + addr;
498 }
499
500 r_host = addr;
501 if(port) r_port = port;
502 if(rtcp_port) r_rtcp_port = rtcp_port;
503
504 memcpy(&r_saddr,&ss,sizeof(struct sockaddr_storage));
505 am_set_port(&r_saddr,r_port);
506
507 mute = ((r_saddr.ss_family == AF_INET) &&
508 (SAv4(&r_saddr)->sin_addr.s_addr == INADDR_ANY)) ||
509 ((r_saddr.ss_family == AF_INET6) &&
510 IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr));
511 }
512
513 void AmRtpStream::handleSymmetricRtp(struct sockaddr_storage* recv_addr, bool rtcp) {
514
515 if((!rtcp && passive) || (rtcp && passive_rtcp)) {
516
517 struct sockaddr_in* in_recv = (struct sockaddr_in*)recv_addr;
518 struct sockaddr_in6* in6_recv = (struct sockaddr_in6*)recv_addr;
519
520 struct sockaddr_in* in_addr = (struct sockaddr_in*)&r_saddr;
521 struct sockaddr_in6* in6_addr = (struct sockaddr_in6*)&r_saddr;
522
523 unsigned short port = am_get_port(recv_addr);
524
525 // symmetric RTP
526 if ( (!rtcp && (port != r_port)) || (rtcp && (port != r_rtcp_port)) ||
527 ((recv_addr->ss_family == AF_INET) &&
528 (in_addr->sin_addr.s_addr != in_recv->sin_addr.s_addr)) ||
529 ((recv_addr->ss_family == AF_INET6) &&
530 (memcmp(&in6_addr->sin6_addr,
531 &in6_recv->sin6_addr,
532 sizeof(struct in6_addr))))
533 ) {
534
535 string addr_str = get_addr_str(recv_addr);
536 setRAddr(addr_str, !rtcp ? port : 0, rtcp ? port : 0);
537 DBG("Symmetric %s: setting new remote address: %s:%i\n",
538 !rtcp ? "RTP" : "RTCP", addr_str.c_str(),port);
539
540 } else {
541 const char* prot = rtcp ? "RTCP" : "RTP";
542 DBG("Symmetric %s: remote end sends %s from advertised address."
543 " Leaving passive mode.\n",prot,prot);
544 }
545
546 // avoid comparing each time sender address
547 if(!rtcp)
548 passive = false;
549 else
550 passive_rtcp = false;
551 }
552 }
553
554 void AmRtpStream::setPassiveMode(bool p)
555 {
556 passive_rtcp = passive = p;
557 if (p) {
558 DBG("The other UA is NATed or passive mode forced: switched to passive mode.\n");
559 } else {
560 DBG("Passive mode not activated.\n");
561 }
562 }
563
564 void AmRtpStream::getSdp(SdpMedia& m)
565 {
566 m.port = getLocalPort();
567 m.nports = 0;
568 m.transport = TP_RTPAVP;
569 m.send = !hold;
570 m.recv = !remotehold;
571 m.dir = SdpMedia::DirBoth;
572 }
573
574 void AmRtpStream::getSdpOffer(unsigned int index, SdpMedia& offer)
575 {
576 sdp_media_index = index;
577 getSdp(offer);
578 offer.payloads.clear();
579 payload_provider->getPayloads(offer.payloads);
580 }
581
582 void AmRtpStream::getSdpAnswer(unsigned int index, const SdpMedia& offer, SdpMedia& answer)
583 {
584 sdp_media_index = index;
585 getSdp(answer);
586 offer.calcAnswer(payload_provider,answer);
587 }
588
589 int AmRtpStream::init(const AmSdp& local,
590 const AmSdp& remote,
591 bool force_passive_mode)
592 {
593 if((sdp_media_index < 0) ||
594 ((unsigned)sdp_media_index >= local.media.size()) ||
595 ((unsigned)sdp_media_index >= remote.media.size()) ) {
596
597 ERROR("Media index %i is invalid, either within local or remote SDP (or both)",sdp_media_index);
598 return -1;
599 }
600
601 const SdpMedia& local_media = local.media[sdp_media_index];
602 const SdpMedia& remote_media = remote.media[sdp_media_index];
603
604 payloads.clear();
605 pl_map.clear();
606 payloads.resize(local_media.payloads.size());
607
608 int i=0;
609 vector<SdpPayload>::const_iterator sdp_it = local_media.payloads.begin();
610 vector<Payload>::iterator p_it = payloads.begin();
611
612 // first pass on local SDP - fill pl_map with intersection of codecs
613 while(sdp_it != local_media.payloads.end()) {
614
615 int int_pt;
616
617 if (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20) int_pt = sdp_it->payload_type;
618 else int_pt = payload_provider->getDynPayload(sdp_it->encoding_name,
619 sdp_it->clock_rate,
620 sdp_it->encoding_param);
621
622 amci_payload_t* a_pl = NULL;
623 if(int_pt >= 0)
624 a_pl = payload_provider->payload(int_pt);
625
626 if(a_pl == NULL){
627 if (relay_payloads.get(sdp_it->payload_type)) {
628 // this payload should be relayed, ignore
629 ++sdp_it;
630 continue;
631 } else {
632 DBG("No internal payload corresponding to type %s/%i (ignoring)\n",
633 sdp_it->encoding_name.c_str(),
634 sdp_it->clock_rate);
635 // ignore this payload
636 ++sdp_it;
637 continue;
638 }
639 };
640
641 p_it->pt = sdp_it->payload_type;
642 p_it->name = sdp_it->encoding_name;
643 p_it->codec_id = a_pl->codec_id;
644 p_it->clock_rate = a_pl->sample_rate;
645 p_it->advertised_clock_rate = sdp_it->clock_rate;
646 p_it->format_parameters = sdp_it->sdp_format_parameters;
647
648 pl_map[sdp_it->payload_type].index = i;
649 pl_map[sdp_it->payload_type].remote_pt = -1;
650
651 ++p_it;
652 ++sdp_it;
653 ++i;
654 }
655
656 // remove payloads which were not initialised (because of unknown payloads
657 // which are to be relayed)
658 if (p_it != payloads.end()) payloads.erase(p_it, payloads.end());
659
660 // second pass on remote SDP - initialize payload IDs used by remote (remote_pt)
661 sdp_it = remote_media.payloads.begin();
662 while(sdp_it != remote_media.payloads.end()) {
663
664 // TODO: match not only on encoding name
665 // but also on parameters, if necessary
666 // Some codecs define multiple payloads
667 // with different encoding parameters
668 PayloadMappingTable::iterator pmt_it = pl_map.end();
669 if(sdp_it->encoding_name.empty() || (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20)){
670 // must be a static payload
671 pmt_it = pl_map.find(sdp_it->payload_type);
672 }
673 else {
674 for(p_it = payloads.begin(); p_it != payloads.end(); ++p_it){
675
676 if(!strcasecmp(p_it->name.c_str(),sdp_it->encoding_name.c_str()) &&
677 (p_it->advertised_clock_rate == (unsigned int)sdp_it->clock_rate)){
678 pmt_it = pl_map.find(p_it->pt);
679 break;
680 }
681 }
682 }
683
684 // TODO: remove following code once proper
685 // payload matching is implemented
686 //
687 // initialize remote_pt if not already there
688 if(pmt_it != pl_map.end() && (pmt_it->second.remote_pt < 0)){
689 pmt_it->second.remote_pt = sdp_it->payload_type;
690 }
691 ++sdp_it;
692 }
693
694 if(!l_port){
695 // only if socket not yet bound:
696 if(session) {
697 setLocalIP(session->localMediaIP());
698 }
699 else {
700 // set local address - media c-line having precedence over session c-line
701 if (local_media.conn.address.empty())
702 setLocalIP(local.conn.address);
703 else
704 setLocalIP(local_media.conn.address);
705 }
706
707 DBG("setting local port to %i",local_media.port);
708 setLocalPort(local_media.port);
709 }
710
711 setPassiveMode(remote_media.dir == SdpMedia::DirActive || force_passive_mode);
712
713 // set remote address - media c-line having precedence over session c-line
714 if (remote.conn.address.empty() && remote_media.conn.address.empty()) {
715 WARN("no c= line given globally or in m= section in remote SDP\n");
716 return -1;
717 }
718 if (remote_media.conn.address.empty())
719 setRAddr(remote.conn.address, remote_media.port, remote_media.port+1);
720 else
721 setRAddr(remote_media.conn.address, remote_media.port, remote_media.port+1);
722
723 if(local_media.payloads.empty()) {
724 DBG("local_media.payloads.empty()\n");
725 return -1;
726 }
727
728 remote_telephone_event_pt.reset(remote.telephoneEventPayload());
729 if (remote_telephone_event_pt.get()) {
730 DBG("remote party supports telephone events (pt=%i)\n",
731 remote_telephone_event_pt->payload_type);
732 } else {
733 DBG("remote party doesn't support telephone events\n");
734 }
735
736 local_telephone_event_pt.reset(local.telephoneEventPayload());
737
738 if(local_media.recv) {
739 resume();
740 } else {
741 pause();
742 }
743
744 if(local_media.send && !hold
745 && (remote_media.port != 0)
746 && (((r_saddr.ss_family == AF_INET)
747 && (SAv4(&r_saddr)->sin_addr.s_addr != 0)) ||
748 ((r_saddr.ss_family == AF_INET6)
749 && (!IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr))))
750 ) {
751 mute = false;
752 } else {
753 mute = true;
754 }
755
756 payload = getDefaultPT();
757 if(payload < 0) {
758 DBG("could not set a default payload\n");
759 return -1;
760 }
761 DBG("default payload selected = %i\n",payload);
762 last_payload = payload;
763
764 active = false; // mark as nothing received yet
765 return 0;
766 }
767
768 void AmRtpStream::setReceiving(bool r) {
769 DBG("RTP stream instance [%p] set receiving=%s\n", this, r?"true":"false");
770 receiving = r;
771 }
772
773 void AmRtpStream::pause()
774 {
775 DBG("RTP Stream instance [%p] pausing (receiving=false)\n", this);
776 receiving = false;
777
778 #ifdef WITH_ZRTP
779 if (session && session->enable_zrtp) {
780 session->zrtp_session_state.stopStreams();
781 }
782 #endif
783
784 }
785
786 void AmRtpStream::resume()
787 {
788 DBG("RTP Stream instance [%p] resuming (receiving=true, clearing biffers/TS/TO)\n", this);
789 clearRTPTimeout();
790 receive_mut.lock();
791 mem.clear();
792 receive_buf.clear();
793 while (!rtp_ev_qu.empty())
794 rtp_ev_qu.pop();
795 receive_mut.unlock();
796 receiving = true;
797
798 #ifdef WITH_ZRTP
799 if (session && session->enable_zrtp) {
800 session->zrtp_session_state.startStreams(get_ssrc());
801 }
802 #endif
803 }
804
805 void AmRtpStream::setOnHold(bool on_hold) {
806 hold = on_hold;
807 }
808
809 bool AmRtpStream::getOnHold() {
810 return hold;
811 }
812
813 void AmRtpStream::setRemoteHold(bool remote_hold) {
814 remotehold = remote_hold;
815 }
816
817 bool AmRtpStream::getRemoteHold() {
818 return remotehold;
819 }
820
821 void AmRtpStream::recvDtmfPacket(AmRtpPacket* p) {
822 if (p->payload == getLocalTelephoneEventPT()) {
823 dtmf_payload_t* dpl = (dtmf_payload_t*)p->getData();
824
825 DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i; ts=%u session = [%p]\n",
826 dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration),p->timestamp, session);
827 if (session)
828 session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getLocalTelephoneEventRate(), p->timestamp));
829 }
830 }
831
832 void AmRtpStream::bufferPacket(AmRtpPacket* p)
833 {
834 clearRTPTimeout(&p->recv_time);
835
836 if (!receiving) {
837
838 if (passive) {
839 handleSymmetricRtp(&p->addr,false);
840 }
841
842 if (force_receive_dtmf) {
843 recvDtmfPacket(p);
844 }
845
846 mem.freePacket(p);
847 return;
848 }
849
850 if (relay_enabled) { // todo: ZRTP
851 if (force_receive_dtmf) {
852 recvDtmfPacket(p);
853 }
854
855 // Relay DTMF packets if current audio payload
856 // is also relayed.
857 // Else, check whether or not we should relay this payload
858
859 bool is_dtmf_packet = (p->payload == getLocalTelephoneEventPT());
860
861 if (relay_raw || (is_dtmf_packet && !active) ||
862 relay_payloads.get(p->payload)) {
863
864 if(active){
865 DBG("switching to relay-mode\t(ts=%u;stream=%p)\n",
866 p->timestamp,this);
867 active = false;
868 }
869 handleSymmetricRtp(&p->addr,false);
870
871 if (NULL != relay_stream &&
872 (!(relay_filter_dtmf && is_dtmf_packet))) {
873 relay_stream->relay(p);
874 }
875
876 mem.freePacket(p);
877 return;
878 }
879 }
880
881 #ifndef WITH_ZRTP
882 // throw away ZRTP packets
883 if(p->version != RTP_VERSION) {
884 mem.freePacket(p);
885 return;
886 }
887 #endif
888
889 receive_mut.lock();
890
891 #ifdef WITH_ZRTP
892 if (session && session->enable_zrtp) {
893
894 if (NULL == session->zrtp_session_state.zrtp_audio) {
895 WARN("dropping received packet, as there's no ZRTP stream initialized\n");
896 receive_mut.unlock();
897 mem.freePacket(p);
898 return;
899 }
900
901 unsigned int size = p->getBufferSize();
902 zrtp_status_t status = zrtp_process_srtp(session->zrtp_session_state.zrtp_audio, (char*)p->getBuffer(), &size);
903 switch (status)
904 {
905 case zrtp_status_ok: {
906 p->setBufferSize(size);
907 if (p->parse() < 0) {
908 ERROR("parsing decoded packet!\n");
909 mem.freePacket(p);
910 } else {
911
912 if(p->payload == getLocalTelephoneEventPT()) {
913 rtp_ev_qu.push(p);
914 } else {
915 if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) {
916 // insert failed
917 mem.freePacket(p);
918 }
919 }
920
921 }
922 } break;
923
924 case zrtp_status_drop: {
925 //
926 // This is a protocol ZRTP packet or masked RTP media.
927 // In either case the packet must be dropped to protect your
928 // media codec
929 mem.freePacket(p);
930
931 } break;
932
933 case zrtp_status_fail:
934 default: {
935 ERROR("zrtp_status_fail!\n");
936 //
937 // This is some kind of error - see logs for more information
938 //
939 mem.freePacket(p);
940 } break;
941 }
942 } else {
943 #endif // WITH_ZRTP
944
945 if(p->payload == getLocalTelephoneEventPT()) {
946 rtp_ev_qu.push(p);
947 } else {
948 if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) {
949 // insert failed
950 mem.freePacket(p);
951 }
952 }
953
954 #ifdef WITH_ZRTP
955 }
956 #endif
957 receive_mut.unlock();
958 }
959
960 void AmRtpStream::clearRTPTimeout(struct timeval* recv_time) {
961 memcpy(&last_recv_time, recv_time, sizeof(struct timeval));
962 }
963
964 void AmRtpStream::clearRTPTimeout() {
965 gettimeofday(&last_recv_time,NULL);
966 }
967
968 int AmRtpStream::getDefaultPT()
969 {
970 for(PayloadCollection::iterator it = payloads.begin();
971 it != payloads.end(); ++it){
972
973 // skip telephone-events payload
974 if(it->codec_id == CODEC_TELEPHONE_EVENT)
975 continue;
976
977 // skip incompatible payloads
978 PayloadMappingTable::iterator pl_it = pl_map.find(it->pt);
979 if ((pl_it == pl_map.end()) || (pl_it->second.remote_pt < 0))
980 continue;
981
982 return it->pt;
983 }
984
985 return -1;
986 }
987
988 int AmRtpStream::nextPacket(AmRtpPacket*& p)
989 {
990 if (!receiving && !passive)
991 return RTP_EMPTY;
992
993 struct timeval now;
994 struct timeval diff;
995 gettimeofday(&now,NULL);
996
997 receive_mut.lock();
998 timersub(&now,&last_recv_time,&diff);
999 if(monitor_rtp_timeout &&
1000 AmConfig::DeadRtpTime &&
1001 (diff.tv_sec > 0) &&
1002 ((unsigned int)diff.tv_sec > AmConfig::DeadRtpTime)){
1003 WARN("RTP Timeout detected. Last received packet is too old "
1004 "(diff.tv_sec = %i\n",(unsigned int)diff.tv_sec);
1005 receive_mut.unlock();
1006 return RTP_TIMEOUT;
1007 }
1008
1009 if(!rtp_ev_qu.empty()) {
1010 // first return RTP telephone event payloads
1011 p = rtp_ev_qu.front();
1012 rtp_ev_qu.pop();
1013 receive_mut.unlock();
1014 return 1;
1015 }
1016
1017 if(receive_buf.empty()){
1018 receive_mut.unlock();
1019 return RTP_EMPTY;
1020 }
1021
1022 p = receive_buf.begin()->second;
1023 receive_buf.erase(receive_buf.begin());
1024 receive_mut.unlock();
1025
1026 return 1;
1027 }
1028
1029 AmRtpPacket *AmRtpStream::reuseBufferedPacket()
1030 {
1031 AmRtpPacket *p = NULL;
1032
1033 receive_mut.lock();
1034 if(!receive_buf.empty()) {
1035 p = receive_buf.begin()->second;
1036 receive_buf.erase(receive_buf.begin());
1037 }
1038 receive_mut.unlock();
1039 return p;
1040 }
1041
1042 void AmRtpStream::recvPacket(int fd, unsigned char* pkt, size_t len)
1043 {
1044 if(fd == l_rtcp_sd){
1045 recvRtcpPacket();
1046 return;
1047 }
1048
1049 AmRtpPacket* p = mem.newPacket();
1050 if (!p) p = reuseBufferedPacket();
1051 if (!p) {
1052 DBG("out of buffers for RTP packets, dropping (stream [%p])\n",
1053 this);
1054 // drop received data
1055 if (NULL != pkt) {
1056 AmRtpPacket dummy;
1057 dummy.recv(l_sd);
1058 }
1059 return;
1060 }
1061
1062 int recv_res = 0;
1063 if (NULL != pkt) {
1064 // "receive" from buffer
1065 recv_res = p->recv(pkt, len);
1066 } else {
1067 // receive from network
1068 recv_res = p->recv(l_sd);
1069 }
1070
1071 if(recv_res > 0){
1072 int parse_res = 0;
1073
1074 if (logger) p->logReceived(logger, &l_saddr);
1075
1076 gettimeofday(&p->recv_time,NULL);
1077
1078 if(!relay_raw
1079 #ifdef WITH_ZRTP
1080 && !(session && session->enable_zrtp)
1081 #endif
1082 ) {
1083 parse_res = p->parse();
1084 }
1085
1086 if (parse_res == -1) {
1087 DBG("error while parsing RTP packet.\n");
1088 clearRTPTimeout(&p->recv_time);
1089 mem.freePacket(p);
1090 } else {
1091 bufferPacket(p);
1092 }
1093 } else {
1094 mem.freePacket(p);
1095 }
1096 }
1097
1098 void AmRtpStream::recvRtcpPacket()
1099 {
1100 struct sockaddr_storage recv_addr;
1101 socklen_t recv_addr_len = sizeof(recv_addr);
1102 unsigned char buffer[4096];
1103
1104 int recved_bytes = recvfrom(l_rtcp_sd,buffer,sizeof(buffer),0,
1105 (struct sockaddr*)&recv_addr,
1106 &recv_addr_len);
1107
1108 if(recved_bytes < 0) {
1109 if((errno != EINTR) &&
1110 (errno != EAGAIN)) {
1111 ERROR("rtcp recv(%d): %s",l_rtcp_sd,strerror(errno));
1112 }
1113 return;
1114 }
1115 else
1116 if(!recved_bytes) return;
1117
1118 static const cstring empty;
1119 if (logger)
1120 logger->log((const char *)buffer, recved_bytes, &recv_addr, &l_rtcp_saddr, empty);
1121
1122 // clear RTP timer
1123 clearRTPTimeout();
1124
1125 handleSymmetricRtp(&recv_addr,true);
1126
1127 if(!relay_enabled || !relay_stream ||
1128 !relay_stream->l_sd)
1129 return;
1130
1131 if((size_t)recved_bytes > sizeof(buffer)) {
1132 ERROR("recved huge RTCP packet (%d)",recved_bytes);
1133 return;
1134 }
1135
1136 struct sockaddr_storage rtcp_raddr;
1137 memcpy(&rtcp_raddr,&relay_stream->r_saddr,sizeof(rtcp_raddr));
1138 am_set_port(&rtcp_raddr, relay_stream->r_rtcp_port);
1139
1140 // FIXME: RTCP relay RTP MUX
1141 int err;
1142 if(AmConfig::UseRawSockets) {
1143 err = raw_sender::send((char*)buffer,recved_bytes,
1144 AmConfig::RTP_Ifs[l_if].NetIfIdx,
1145 &relay_stream->l_saddr,
1146 &rtcp_raddr);
1147 }
1148 else {
1149 err = sendto(relay_stream->l_rtcp_sd,buffer,recved_bytes,0,
1150 (const struct sockaddr *)&rtcp_raddr,
1151 SA_len(&rtcp_raddr));
1152 }
1153
1154 if(err < 0){
1155 ERROR("could not relay RTCP packet: %s\n",strerror(errno));
1156 return;
1157 }
1158
1159 if (logger)
1160 logger->log((const char *)buffer, recved_bytes, &relay_stream->l_rtcp_saddr, &rtcp_raddr, empty);
1161
1162 }
1163
1164 void AmRtpStream::relay(AmRtpPacket* p)
1165 {
1166 // not yet initialized
1167 // or muted/on-hold
1168 if (!l_port || mute || hold)
1169 return;
1170
1171 if(session && !session->onBeforeRTPRelay(p,&r_saddr))
1172 return;
1173
1174 rtp_hdr_t* hdr = (rtp_hdr_t*)p->getBuffer();
1175 if (!relay_raw && !relay_transparent_seqno)
1176 hdr->seq = htons(sequence++);
1177 if (!relay_raw && !relay_transparent_ssrc)
1178 hdr->ssrc = htonl(l_ssrc);
1179 p->setAddr(&r_saddr);
1180
1181 if(p->send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
1182 ERROR("while sending RTP packet to '%s':%i\n",
1183 get_addr_str(&r_saddr).c_str(),am_get_port(&r_saddr));
1184 }
1185 else {
1186 if (logger) p->logSent(logger, &l_saddr);
1187 if(session) session->onAfterRTPRelay(p,&r_saddr);
1188 }
1189 }
1190
1191 int AmRtpStream::getLocalTelephoneEventRate()
1192 {
1193 if (local_telephone_event_pt.get())
1194 return local_telephone_event_pt->clock_rate;
1195 return 0;
1196 }
1197
1198 int AmRtpStream::getLocalTelephoneEventPT()
1199 {
1200 if(local_telephone_event_pt.get())
1201 return local_telephone_event_pt->payload_type;
1202 return -1;
1203 }
1204
1205 void AmRtpStream::setPayloadProvider(AmPayloadProvider* pl_prov)
1206 {
1207 payload_provider = pl_prov;
1208 }
1209
1210 void AmRtpStream::sendDtmf(int event, unsigned int duration_ms) {
1211 dtmf_sender.queueEvent(event,duration_ms,getLocalTelephoneEventRate());
1212 }
1213
1214 void AmRtpStream::setRelayStream(AmRtpStream* stream) {
1215 relay_stream = stream;
1216 DBG("set relay stream [%p] for RTP instance [%p]\n",
1217 stream, this);
1218 }
1219
1220 void AmRtpStream::setRelayPayloads(const PayloadMask &_relay_payloads)
1221 {
1222 relay_payloads = _relay_payloads;
1223 }
1224
1225 void AmRtpStream::enableRtpRelay() {
1226 DBG("enabled RTP relay for RTP stream instance [%p]\n", this);
1227 relay_enabled = true;
1228 }
1229
1230 void AmRtpStream::disableRtpRelay() {
1231 DBG("disabled RTP relay for RTP stream instance [%p]\n", this);
1232 relay_enabled = false;
1233 }
1234
1235 void AmRtpStream::enableRawRelay()
1236 {
1237 DBG("enabled RAW relay for RTP stream instance [%p]\n", this);
1238 relay_raw = true;
1239 }
1240
1241 void AmRtpStream::disableRawRelay()
1242 {
1243 DBG("disabled RAW relay for RTP stream instance [%p]\n", this);
1244 relay_raw = false;
1245 }
1246
1247 void AmRtpStream::setRtpRelayTransparentSeqno(bool transparent) {
1248 DBG("%sabled RTP relay transparent seqno for RTP stream instance [%p]\n",
1249 transparent ? "en":"dis", this);
1250 relay_transparent_seqno = transparent;
1251 }
1252
1253 void AmRtpStream::setRtpRelayTransparentSSRC(bool transparent) {
1254 DBG("%sabled RTP relay transparent SSRC for RTP stream instance [%p]\n",
1255 transparent ? "en":"dis", this);
1256 relay_transparent_ssrc = transparent;
1257 }
1258
1259 void AmRtpStream::setRtpRelayFilterRtpDtmf(bool filter) {
1260 DBG("%sabled RTP relay filtering of RTP DTMF (2833 / 4733) for RTP stream instance [%p]\n",
1261 filter ? "en":"dis", this);
1262 relay_filter_dtmf = filter;
1263 }
1264
1265 void AmRtpStream::setRtpMuxRemote(const string& remote_ip, unsigned int remote_port) {
1266 rtp_mux_remote_ip = remote_ip;
1267 rtp_mux_remote_port = remote_port;
1268 if (remote_ip.empty() || !remote_port) {
1269 DBG("RTP stream [%p]: disabled RTP MUX\n", this);
1270 } else {
1271 DBG("RTP stream [%p]: set RTP MUX remote to %s:%u\n", this, remote_ip.c_str(), remote_port);
1272 }
1273 }
1274
1275 void AmRtpStream::stopReceiving()
1276 {
1277 if (hasLocalSocket()){
1278 DBG("remove stream [%p] from RTP receiver\n", this);
1279 AmRtpReceiver::instance()->removeStream(getLocalSocket(), getLocalPort());
1280 if (l_rtcp_sd > 0) AmRtpReceiver::instance()->removeStream(l_rtcp_sd, getLocalRtcpPort());
1281 }
1282 }
1283
1284 void AmRtpStream::resumeReceiving()
1285 {
1286 if (hasLocalSocket()){
1287 DBG("add/resume stream [%p] into RTP receiver\n",this);
1288 AmRtpReceiver::instance()->addStream(getLocalSocket(), this);
1289 if (l_rtcp_sd > 0) AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
1290 }
1291 }
1292
1293 string AmRtpStream::getPayloadName(int payload_type)
1294 {
1295 for(PayloadCollection::iterator it = payloads.begin();
1296 it != payloads.end(); ++it){
1297
1298 if (it->pt == payload_type) return it->name;
1299 }
1300
1301 return string("");
1302 }
1303
1304 PacketMem::PacketMem()
1305 : cur_idx(0), n_used(0)
1306 {
1307 memset(used, 0, sizeof(used));
1308 }
1309
1310 inline AmRtpPacket* PacketMem::newPacket()
1311 {
1312 if(n_used >= MAX_PACKETS)
1313 return NULL; // full
1314
1315 while(used[cur_idx])
1316 cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK;
1317
1318 used[cur_idx] = true;
1319 n_used++;
1320
1321 AmRtpPacket* p = &packets[cur_idx];
1322 cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK;
1323
1324 return p;
1325 }
1326
1327 inline void PacketMem::freePacket(AmRtpPacket* p)
1328 {
1329 if (!p) return;
1330
1331 int idx = p-packets;
1332 assert(idx >= 0);
1333 assert(idx < MAX_PACKETS);
1334
1335 if(!used[idx]) {
1336 ERROR("freePacket() double free: n_used = %d, idx = %d",n_used,idx);
1337 return;
1338 }
1339
1340 used[p-packets] = false;
1341 n_used--;
1342 }
1343
1344 inline void PacketMem::clear()
1345 {
1346 memset(used, 0, sizeof(used));
1347 n_used = cur_idx = 0;
1348 }
1349
1350 void AmRtpStream::setLogger(msg_logger* _logger)
1351 {
1352 if (logger) dec_ref(logger);
1353 logger = _logger;
1354 if (logger) inc_ref(logger);
1355 }
1356
1357 void AmRtpStream::debug()
1358 {
1359 #define BOOL_STR(b) ((b) ? "yes" : "no")
1360
1361 if(hasLocalSocket() > 0) {
1362 DBG("\t<%i> <-> <%s:%i>", getLocalPort(),
1363 getRHost().c_str(), getRPort());
1364 } else {
1365 DBG("\t<unbound> <-> <%s:%i>",
1366 getRHost().c_str(), getLocalPort());
1367 }
1368
1369 if (relay_stream) {
1370 DBG("\tinternal relay to stream %p (local port %i)",
1371 relay_stream, relay_stream->getLocalPort());
1372 }
1373 else DBG("\tno relay");
1374
1375 DBG("\tmute: %s, hold: %s, receiving: %s",
1376 BOOL_STR(mute), BOOL_STR(hold), BOOL_STR(receiving));
1377
1378 #undef BOOL_STR
1379 }
1380