1 /*
2  * Copyright (C) 2002-2003 Fhg Fokus
3  *
4  * This file is part of SEMS, a free SIP media server.
5  *
6  * SEMS is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 2 of the License, or
9  * (at your option) any later version. This program is released under
10  * the GPL with the additional exemption that compiling, linking,
11  * and/or using OpenSSL is allowed.
12  *
13  * For a license to use the SEMS software under conditions
14  * other than those described here, or to purchase support for this
15  * software, please contact iptel.org by e-mail at the following addresses:
16  *    info@iptel.org
17  *
18  * SEMS is distributed in the hope that it will be useful,
19  * but WITHOUT ANY WARRANTY; without even the implied warranty of
20  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  * GNU General Public License for more details.
22  *
23  * You should have received a copy of the GNU General Public License
24  * along with this program; if not, write to the Free Software
25  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
26  */
27 
28 #include "AmRtpStream.h"
29 #include "AmRtpPacket.h"
30 #include "AmRtpReceiver.h"
31 #include "AmConfig.h"
32 #include "AmPlugIn.h"
33 #include "AmAudio.h"
34 #include "AmUtils.h"
35 #include "AmSession.h"
36 #include "AmRtpMuxStream.h"
37 
38 #include "AmDtmfDetector.h"
39 #include "rtp/telephone_event.h"
40 #include "amci/codecs.h"
41 #include "AmJitterBuffer.h"
42 
43 #include "sip/resolver.h"
44 #include "sip/ip_util.h"
45 #include "sip/raw_sender.h"
46 #include "sip/msg_logger.h"
47 
48 #include "log.h"
49 
50 #include <assert.h>
51 #include <stdlib.h>
52 #include <unistd.h>
53 #include <sys/time.h>
54 #include <sys/types.h>
55 #include <sys/socket.h>
56 #include <sys/ioctl.h>
57 #include <arpa/inet.h>
58 #include <netinet/in.h>
59 
StreamInfoStreamInfo60 #ifdef WITH_ZRTP
61 #include "libzrtp/zrtp.h"
62 #endif
63 
64 #include "rtp/rtp.h"
65 
66 #include <set>
67 using std::set;
68 
69 void PayloadMask::clear()
70 {
71   memset(bits, 0, sizeof(bits));
72 }
73 
74 void PayloadMask::set_all()
RtpPacketRtpPacket75 {
76   memset(bits, 0xFF, sizeof(bits));
77 }
78 
79 void PayloadMask::invert()
80 {
81   // assumes that bits[] contains 128 bits
~RtpPacketRtpPacket82   unsigned long long* ull = (unsigned long long*)bits;
83   ull[0] = ~ull[0];
84   ull[1] = ~ull[1];
85 }
86 
87 PayloadMask::PayloadMask(const PayloadMask &src)
88 {
89   memcpy(bits, src.bits, sizeof(bits));
90 }
91 
92 //////////////////////////////////////////////////////////////////////////////////////////////////////////
93 
94 /*
95  * This function must be called before setLocalPort, because
96  * setLocalPort will bind the socket and it will be not
97  * possible to change the IP later
98  */
99 void AmRtpStream::setLocalIP(const string& ip)
100 {
101   if (!am_inet_pton(ip.c_str(), &l_saddr)) {
102     throw string ("AmRtpStream::setLocalIP: Invalid IP address: ") + ip;
103   }
104   DBG("ip = %s\n",ip.c_str());
105 }
106 
107 int AmRtpStream::hasLocalSocket() {
108   return l_sd;
109 }
110 
111 int AmRtpStream::getLocalSocket()
112 {
113   if (l_sd)
114     return l_sd;
115 
116   int sd=0, rtcp_sd=0;
117   if((sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) {
118     ERROR("%s\n",strerror(errno));
119     throw string ("while creating new socket.");
120   }
121 
122   if((rtcp_sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) {
123     ERROR("%s\n",strerror(errno));
124     throw string ("while creating new socket.");
125   }
126 
127   int true_opt = 1;
128   if(ioctl(sd, FIONBIO , &true_opt) == -1){
129     ERROR("%s\n",strerror(errno));
130     close(sd);
131     throw string ("while setting socket non blocking.");
132   }
133 
134   if(ioctl(rtcp_sd, FIONBIO , &true_opt) == -1){
135     ERROR("%s\n",strerror(errno));
136     close(sd);
137     throw string ("while setting socket non blocking.");
138   }
139 
140   l_sd = sd;
141   l_rtcp_sd = rtcp_sd;
142 
143   return l_sd;
144 }
145 
146 void AmRtpStream::setLocalPort(unsigned short p)
147 {
148   if(l_port)
149     return;
150 
151   if(l_if < 0) {
152     if (session) l_if = session->getRtpInterface();
153     else {
154       ERROR("BUG: no session when initializing RTP stream, invalid interface can be used\n");
155       l_if = 0;
156     }
157   }
158 
159   int retry = 10;
160   unsigned short port = 0;
161   for(;retry; --retry){
162 
163     if (!getLocalSocket())
164       return;
165 
166     if(!p)
167       port = AmConfig::RTP_Ifs[l_if].getNextRtpPort();
168     else
169       port = p;
170 
171     am_set_port(&l_saddr,port+1);
172     if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) {
173       DBG("bind: %s\n",strerror(errno));
174       goto try_another_port;
175     }
176 
177     am_set_port(&l_saddr,port);
178     if(bind(l_sd,(const struct sockaddr*)&l_saddr,SA_len(&l_saddr))) {
179       DBG("bind: %s\n",strerror(errno));
180       goto try_another_port;
181     }
182 
183     // both bind() succeeded!
184     break;
185 
186   try_another_port:
187       close(l_sd);
188       l_sd = 0;
189       close(l_rtcp_sd);
190       l_rtcp_sd = 0;
191   }
192 
193   int true_opt = 1;
194   if (!retry){
195     ERROR("could not find a free RTP port\n");
196     throw string("could not find a free RTP port");
197   }
198 
199   // rco: does that make sense after bind() ????
200   if(setsockopt(l_sd, SOL_SOCKET, SO_REUSEADDR,
201 		(void*)&true_opt, sizeof (true_opt)) == -1) {
202 
203     ERROR("%s\n",strerror(errno));
204     close(l_sd);
205     l_sd = 0;
206     throw string ("while setting local address reusable.");
207   }
208 
209   l_port = port;
210   l_rtcp_port = port+1;
211 
212   if(!p) {
213     AmRtpReceiver::instance()->addStream(l_sd, this);
214     AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
215     DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this,
216 	get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port);
217   }
218 
219   memcpy(&l_rtcp_saddr, &l_saddr, sizeof(l_saddr));
220   am_set_port(&l_rtcp_saddr, l_rtcp_port);
221 }
222 
223 int AmRtpStream::ping()
224 {
225   // TODO:
226   //  - we'd better send an empty UDP packet
227   //    for this purpose.
228 
229   unsigned char ping_chr[2];
230 
231   ping_chr[0] = 0;
232   ping_chr[1] = 0;
233 
234   AmRtpPacket rp;
235   rp.payload = payload;
236   rp.marker = true;
237   rp.sequence = sequence++;
238   rp.timestamp = 0;
239   rp.ssrc = l_ssrc;
240   rp.compile((unsigned char*)ping_chr,2);
241 
242   rp.setAddr(&r_saddr);
243   if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx,&l_saddr) < 0){
244     ERROR("while sending RTP packet.\n");
245     return -1;
246   }
247 
248   return 2;
249 }
250 
251 int AmRtpStream::compile_and_send(const int payload, bool marker, unsigned int ts,
252 				  unsigned char* buffer, unsigned int size) {
253   AmRtpPacket rp;
254   rp.payload = payload;
255   rp.timestamp = ts;
256   rp.marker = marker;
257   rp.sequence = sequence++;
258   rp.ssrc = l_ssrc;
259   rp.compile((unsigned char*)buffer,size);
260 
261   rp.setAddr(&r_saddr);
262 
263 #ifdef WITH_ZRTP
264   if (session && session->enable_zrtp){
265     if (NULL == session->zrtp_session_state.zrtp_audio) {
266       ERROR("ZRTP enabled on session, but no audio stream created\n");
267       return -1;
268     }
269 
270     unsigned int size = rp.getBufferSize();
271     zrtp_status_t status = zrtp_process_rtp(session->zrtp_session_state.zrtp_audio,
272 					    (char*)rp.getBuffer(), &size);
273     switch (status) {
274     case zrtp_status_drop: {
275       DBG("ZRTP says: drop packet! %u - %u\n", size, rp.getBufferSize());
276       return 0;
277     }
278     case zrtp_status_ok: {
279       //      DBG("ZRTP says: ok!\n");
280       if (rp.getBufferSize() != size)
281 //       DBG("SEND packet size before: %d, after %d\n",
282 // 	   rp.getBufferSize(), size);
283       rp.setBufferSize(size);
284     } break;
285     default:
286     case zrtp_status_fail: {
287       DBG("ZRTP says: fail!\n");
288       //      DBG("(f)");
289       return 0;
290     }
291 
292     }
293 
294   }
295 #endif
296 
297   if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
298     ERROR("while sending RTP packet.\n");
299     return -1;
300   }
301 
302   if (logger) rp.logSent(logger, &l_saddr);
303 
304   return size;
305 }
306 
307 int AmRtpStream::send( unsigned int ts, unsigned char* buffer, unsigned int size )
308 {
309   if ((mute) || (hold))
310     return 0;
311 
312   if(remote_telephone_event_pt.get())
313     dtmf_sender.sendPacket(ts,remote_telephone_event_pt->payload_type,this);
314 
315   if(!size)
316     return -1;
317 
318   PayloadMappingTable::iterator it = pl_map.find(payload);
319   if ((it == pl_map.end()) || (it->second.remote_pt < 0)) {
320     ERROR("sending packet with unsupported remote payload type %d\n", payload);
321     return -1;
322   }
323 
324   return compile_and_send(it->second.remote_pt, false, ts, buffer, size);
325 }
326 
327 int AmRtpStream::send_raw( char* packet, unsigned int length )
328 {
329   if ((mute) || (hold))
330     return 0;
331 
332   AmRtpPacket rp;
333   rp.compile_raw((unsigned char*)packet, length);
334   rp.setAddr(&r_saddr);
335 
336   if(rp.send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
337     ERROR("while sending raw RTP packet.\n");
338     return -1;
339   }
340 
341   if (logger) rp.logSent(logger, &l_saddr);
342 
343   return length;
344 }
345 
346 // returns
347 // @param ts              [out] timestamp of the received packet,
348 //                              in audio buffer relative time
349 // @param audio_buffer_ts [in]  current ts at the audio_buffer
350 
351 int AmRtpStream::receive( unsigned char* buffer, unsigned int size,
352 			  unsigned int& ts, int &out_payload)
353 {
354   AmRtpPacket* rp = NULL;
355   int err = nextPacket(rp);
356 
357   if(err <= 0)
358     return err;
359 
360   if (!rp)
361     return 0;
362 
363   handleSymmetricRtp(&rp->addr,false);
364 
365   /* do we have a new talk spurt? */
366   begin_talk = ((last_payload == 13) || rp->marker);
367   last_payload = rp->payload;
368 
369   if(!rp->getDataSize()) {
370     mem.freePacket(rp);
371     return RTP_EMPTY;
372   }
373 
374   if (rp->payload == getLocalTelephoneEventPT())
375     {
376       recvDtmfPacket(rp);
377       mem.freePacket(rp);
378       return RTP_DTMF;
379     }
380 
381   assert(rp->getData());
382   if(rp->getDataSize() > size){
383     ERROR("received too big RTP packet\n");
384     mem.freePacket(rp);
385     return RTP_BUFFER_SIZE;
386   }
387 
388   memcpy(buffer,rp->getData(),rp->getDataSize());
389   ts = rp->timestamp;
390   out_payload = rp->payload;
391 
392   int res = rp->getDataSize();
393   mem.freePacket(rp);
394   return res;
395 }
396 
397 AmRtpStream::AmRtpStream(AmSession* _s, int _if)
398   : sdp_media_index(-1),
399     r_port(0),
400     l_if(_if),
401     l_port(0),
402     l_sd(0),
403     r_ssrc_i(false),
404     passive(false),
405     passive_rtcp(false),
406     hold(false),
407     remotehold(false),
408     monitor_rtp_timeout(true),
409     receiving(true),
410     relay_enabled(false),
411     relay_raw(false),
412     relay_stream(NULL),
413     relay_transparent_seqno(true),
414     relay_transparent_ssrc(true),
415     relay_filter_dtmf(false),
416     session(_s),
417     logger(NULL),
418     offer_answer_used(true),
419     active(false), // do not return any data unless something really received
420     mute(false),
421     force_receive_dtmf(false)
422 {
423 
424   memset(&r_saddr,0,sizeof(struct sockaddr_storage));
425   memset(&l_saddr,0,sizeof(struct sockaddr_storage));
426 
427   l_ssrc = get_random();
428   sequence = get_random();
429   clearRTPTimeout();
430 
431   // by default the system codecs
432   payload_provider = AmPlugIn::instance();
433 }
434 
435 AmRtpStream::~AmRtpStream()
436 {
437   if (!rtp_mux_remote_ip.empty()) {
438     DBG("RTP MUX: closing on RTP stream instance [%p]\n", this);
439     AmRtpMuxSender::instance()->close(rtp_mux_remote_ip, rtp_mux_remote_port, getRPort());
440   }
441 
442   if(l_sd){
443     if (AmRtpReceiver::haveInstance()){
444       AmRtpReceiver::instance()->removeStream(l_sd, getLocalPort());
445       AmRtpReceiver::instance()->removeStream(l_rtcp_sd, getLocalRtcpPort());
446     }
447     close(l_sd);
448     close(l_rtcp_sd);
449   }
450   if (logger) dec_ref(logger);
451 }
452 
453 int AmRtpStream::getLocalPort()
454 {
455   //  if (hold)
456   //    return 0;
457 
458   if(!l_port)
459     setLocalPort();
460 
461   return l_port;
462 }
463 
464 int AmRtpStream::getLocalRtcpPort()
465 {
466   if(!l_rtcp_port)
467     setLocalPort();
468 
469   return l_rtcp_port;
470 }
471 
472 int AmRtpStream::getRPort()
473 {
474   return r_port;
475 }
476 
477 string AmRtpStream::getRHost()
478 {
479   return r_host;
480 }
481 
482 void AmRtpStream::setRAddr(const string& addr, unsigned short port,
483 			   unsigned short rtcp_port)
484 {
485   DBG("RTP remote address set to %s:(%u/%u)\n",
486       addr.c_str(),port,rtcp_port);
487 
488   struct sockaddr_storage ss;
489   memset (&ss, 0, sizeof (ss));
490 
491   /* inet_aton only supports dot-notation IP address strings... but an RFC
492    * 4566 unicast-address, as found in c=, can be an FQDN (or other!).
493    */
494   dns_handle dh;
495   if (resolver::instance()->resolve_name(addr.c_str(),&dh,&ss,IPv4) < 0) {
496     WARN("Address not valid (host: %s).\n", addr.c_str());
497     throw string("invalid address") + addr;
498   }
499 
500   r_host = addr;
501   if(port)      r_port      = port;
502   if(rtcp_port) r_rtcp_port = rtcp_port;
503 
504   memcpy(&r_saddr,&ss,sizeof(struct sockaddr_storage));
505   am_set_port(&r_saddr,r_port);
506 
507   mute = ((r_saddr.ss_family == AF_INET) &&
508 	  (SAv4(&r_saddr)->sin_addr.s_addr == INADDR_ANY)) ||
509     ((r_saddr.ss_family == AF_INET6) &&
510      IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr));
511 }
512 
513 void AmRtpStream::handleSymmetricRtp(struct sockaddr_storage* recv_addr, bool rtcp) {
514 
515   if((!rtcp && passive) || (rtcp && passive_rtcp)) {
516 
517     struct sockaddr_in* in_recv = (struct sockaddr_in*)recv_addr;
518     struct sockaddr_in6* in6_recv = (struct sockaddr_in6*)recv_addr;
519 
520     struct sockaddr_in* in_addr = (struct sockaddr_in*)&r_saddr;
521     struct sockaddr_in6* in6_addr = (struct sockaddr_in6*)&r_saddr;
522 
523     unsigned short port = am_get_port(recv_addr);
524 
525     // symmetric RTP
526     if ( (!rtcp && (port != r_port)) || (rtcp && (port != r_rtcp_port)) ||
527 	 ((recv_addr->ss_family == AF_INET) &&
528 	  (in_addr->sin_addr.s_addr != in_recv->sin_addr.s_addr)) ||
529 	 ((recv_addr->ss_family == AF_INET6) &&
530 	  (memcmp(&in6_addr->sin6_addr,
531 		      &in6_recv->sin6_addr,
532 		      sizeof(struct in6_addr))))
533 	 ) {
534 
535       string addr_str = get_addr_str(recv_addr);
536       setRAddr(addr_str, !rtcp ? port : 0, rtcp ? port : 0);
537       DBG("Symmetric %s: setting new remote address: %s:%i\n",
538 	  !rtcp ? "RTP" : "RTCP", addr_str.c_str(),port);
539 
540     } else {
541       const char* prot = rtcp ? "RTCP" : "RTP";
542       DBG("Symmetric %s: remote end sends %s from advertised address."
543 	  " Leaving passive mode.\n",prot,prot);
544     }
545 
546     // avoid comparing each time sender address
547     if(!rtcp)
548       passive = false;
549     else
550       passive_rtcp = false;
551   }
552 }
553 
554 void AmRtpStream::setPassiveMode(bool p)
555 {
556   passive_rtcp = passive = p;
557   if (p) {
558     DBG("The other UA is NATed or passive mode forced: switched to passive mode.\n");
559   } else {
560     DBG("Passive mode not activated.\n");
561   }
562 }
563 
564 void AmRtpStream::getSdp(SdpMedia& m)
565 {
566   m.port = getLocalPort();
567   m.nports = 0;
568   m.transport = TP_RTPAVP;
569   m.send = !hold;
570   m.recv = !remotehold;
571   m.dir = SdpMedia::DirBoth;
572 }
573 
574 void AmRtpStream::getSdpOffer(unsigned int index, SdpMedia& offer)
575 {
576   sdp_media_index = index;
577   getSdp(offer);
578   offer.payloads.clear();
579   payload_provider->getPayloads(offer.payloads);
580 }
581 
582 void AmRtpStream::getSdpAnswer(unsigned int index, const SdpMedia& offer, SdpMedia& answer)
583 {
584   sdp_media_index = index;
585   getSdp(answer);
586   offer.calcAnswer(payload_provider,answer);
587 }
588 
589 int AmRtpStream::init(const AmSdp& local,
590 		      const AmSdp& remote,
591                       bool force_passive_mode)
592 {
593   if((sdp_media_index < 0) ||
594      ((unsigned)sdp_media_index >= local.media.size()) ||
595      ((unsigned)sdp_media_index >= remote.media.size()) ) {
596 
597     ERROR("Media index %i is invalid, either within local or remote SDP (or both)",sdp_media_index);
598     return -1;
599   }
600 
601   const SdpMedia& local_media = local.media[sdp_media_index];
602   const SdpMedia& remote_media = remote.media[sdp_media_index];
603 
604   payloads.clear();
605   pl_map.clear();
606   payloads.resize(local_media.payloads.size());
607 
608   int i=0;
609   vector<SdpPayload>::const_iterator sdp_it = local_media.payloads.begin();
610   vector<Payload>::iterator p_it = payloads.begin();
611 
612   // first pass on local SDP - fill pl_map with intersection of codecs
613   while(sdp_it != local_media.payloads.end()) {
614 
615     int int_pt;
616 
617     if (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20) int_pt = sdp_it->payload_type;
618     else int_pt = payload_provider->getDynPayload(sdp_it->encoding_name,
619         sdp_it->clock_rate,
620         sdp_it->encoding_param);
621 
622     amci_payload_t* a_pl = NULL;
623     if(int_pt >= 0)
624       a_pl = payload_provider->payload(int_pt);
625 
626     if(a_pl == NULL){
627       if (relay_payloads.get(sdp_it->payload_type)) {
628         // this payload should be relayed, ignore
629         ++sdp_it;
630         continue;
631       } else {
632         DBG("No internal payload corresponding to type %s/%i (ignoring)\n",
633               sdp_it->encoding_name.c_str(),
634               sdp_it->clock_rate);
635 	// ignore this payload
636         ++sdp_it;
637         continue;
638       }
639     };
640 
641     p_it->pt         = sdp_it->payload_type;
642     p_it->name       = sdp_it->encoding_name;
643     p_it->codec_id   = a_pl->codec_id;
644     p_it->clock_rate = a_pl->sample_rate;
645     p_it->advertised_clock_rate = sdp_it->clock_rate;
646     p_it->format_parameters = sdp_it->sdp_format_parameters;
647 
648     pl_map[sdp_it->payload_type].index     = i;
649     pl_map[sdp_it->payload_type].remote_pt = -1;
650 
651     ++p_it;
652     ++sdp_it;
653     ++i;
654   }
655 
656   // remove payloads which were not initialised (because of unknown payloads
657   // which are to be relayed)
658   if (p_it != payloads.end()) payloads.erase(p_it, payloads.end());
659 
660   // second pass on remote SDP - initialize payload IDs used by remote (remote_pt)
661   sdp_it = remote_media.payloads.begin();
662   while(sdp_it != remote_media.payloads.end()) {
663 
664     // TODO: match not only on encoding name
665     //       but also on parameters, if necessary
666     //       Some codecs define multiple payloads
667     //       with different encoding parameters
668     PayloadMappingTable::iterator pmt_it = pl_map.end();
669     if(sdp_it->encoding_name.empty() || (local_media.transport == TP_RTPAVP && sdp_it->payload_type < 20)){
670       // must be a static payload
671       pmt_it = pl_map.find(sdp_it->payload_type);
672     }
673     else {
674       for(p_it = payloads.begin(); p_it != payloads.end(); ++p_it){
675 
676 	if(!strcasecmp(p_it->name.c_str(),sdp_it->encoding_name.c_str()) &&
677 	   (p_it->advertised_clock_rate == (unsigned int)sdp_it->clock_rate)){
678 	  pmt_it = pl_map.find(p_it->pt);
679 	  break;
680 	}
681       }
682     }
683 
684     // TODO: remove following code once proper
685     //       payload matching is implemented
686     //
687     // initialize remote_pt if not already there
688     if(pmt_it != pl_map.end() && (pmt_it->second.remote_pt < 0)){
689       pmt_it->second.remote_pt = sdp_it->payload_type;
690     }
691     ++sdp_it;
692   }
693 
694   if(!l_port){
695     // only if socket not yet bound:
696     if(session) {
697       setLocalIP(session->localMediaIP());
698     }
699     else {
700       // set local address - media c-line having precedence over session c-line
701       if (local_media.conn.address.empty())
702 	setLocalIP(local.conn.address);
703       else
704 	setLocalIP(local_media.conn.address);
705     }
706 
707     DBG("setting local port to %i",local_media.port);
708     setLocalPort(local_media.port);
709   }
710 
711   setPassiveMode(remote_media.dir == SdpMedia::DirActive || force_passive_mode);
712 
713   // set remote address - media c-line having precedence over session c-line
714   if (remote.conn.address.empty() && remote_media.conn.address.empty()) {
715     WARN("no c= line given globally or in m= section in remote SDP\n");
716     return -1;
717   }
718   if (remote_media.conn.address.empty())
719     setRAddr(remote.conn.address, remote_media.port, remote_media.port+1);
720   else
721     setRAddr(remote_media.conn.address, remote_media.port, remote_media.port+1);
722 
723   if(local_media.payloads.empty()) {
724     DBG("local_media.payloads.empty()\n");
725     return -1;
726   }
727 
728   remote_telephone_event_pt.reset(remote.telephoneEventPayload());
729   if (remote_telephone_event_pt.get()) {
730       DBG("remote party supports telephone events (pt=%i)\n",
731 	  remote_telephone_event_pt->payload_type);
732   } else {
733     DBG("remote party doesn't support telephone events\n");
734   }
735 
736   local_telephone_event_pt.reset(local.telephoneEventPayload());
737 
738   if(local_media.recv) {
739     resume();
740   } else {
741     pause();
742   }
743 
744   if(local_media.send && !hold
745      && (remote_media.port != 0)
746      && (((r_saddr.ss_family == AF_INET)
747 	  && (SAv4(&r_saddr)->sin_addr.s_addr != 0)) ||
748 	 ((r_saddr.ss_family == AF_INET6)
749 	  && (!IN6_IS_ADDR_UNSPECIFIED(&SAv6(&r_saddr)->sin6_addr))))
750      ) {
751     mute = false;
752   } else {
753     mute = true;
754   }
755 
756   payload = getDefaultPT();
757   if(payload < 0) {
758     DBG("could not set a default payload\n");
759     return -1;
760   }
761   DBG("default payload selected = %i\n",payload);
762   last_payload = payload;
763 
764   active = false; // mark as nothing received yet
765   return 0;
766 }
767 
768 void AmRtpStream::setReceiving(bool r) {
769   DBG("RTP stream instance [%p] set receiving=%s\n", this, r?"true":"false");
770   receiving = r;
771 }
772 
773 void AmRtpStream::pause()
774 {
775   DBG("RTP Stream instance [%p] pausing (receiving=false)\n", this);
776   receiving = false;
777 
778 #ifdef WITH_ZRTP
779   if (session && session->enable_zrtp) {
780     session->zrtp_session_state.stopStreams();
781   }
782 #endif
783 
784 }
785 
786 void AmRtpStream::resume()
787 {
788   DBG("RTP Stream instance [%p] resuming (receiving=true, clearing biffers/TS/TO)\n", this);
789   clearRTPTimeout();
790   receive_mut.lock();
791   mem.clear();
792   receive_buf.clear();
793   while (!rtp_ev_qu.empty())
794     rtp_ev_qu.pop();
795   receive_mut.unlock();
796   receiving = true;
797 
798 #ifdef WITH_ZRTP
799   if (session && session->enable_zrtp) {
800     session->zrtp_session_state.startStreams(get_ssrc());
801   }
802 #endif
803 }
804 
805 void AmRtpStream::setOnHold(bool on_hold) {
806   hold = on_hold;
807 }
808 
809 bool AmRtpStream::getOnHold() {
810   return hold;
811 }
812 
813 void AmRtpStream::setRemoteHold(bool remote_hold) {
814   remotehold = remote_hold;
815 }
816 
817 bool AmRtpStream::getRemoteHold() {
818   return remotehold;
819 }
820 
821 void AmRtpStream::recvDtmfPacket(AmRtpPacket* p) {
822   if (p->payload == getLocalTelephoneEventPT()) {
823     dtmf_payload_t* dpl = (dtmf_payload_t*)p->getData();
824 
825     DBG("DTMF: event=%i; e=%i; r=%i; volume=%i; duration=%i; ts=%u session = [%p]\n",
826 	dpl->event,dpl->e,dpl->r,dpl->volume,ntohs(dpl->duration),p->timestamp, session);
827     if (session)
828       session->postDtmfEvent(new AmRtpDtmfEvent(dpl, getLocalTelephoneEventRate(), p->timestamp));
829   }
830 }
831 
832 void AmRtpStream::bufferPacket(AmRtpPacket* p)
833 {
834   clearRTPTimeout(&p->recv_time);
835 
836   if (!receiving) {
837 
838     if (passive) {
839       handleSymmetricRtp(&p->addr,false);
840     }
841 
842     if (force_receive_dtmf) {
843       recvDtmfPacket(p);
844     }
845 
846     mem.freePacket(p);
847     return;
848   }
849 
850   if (relay_enabled) { // todo: ZRTP
851     if (force_receive_dtmf) {
852       recvDtmfPacket(p);
853     }
854 
855     // Relay DTMF packets if current audio payload
856     // is also relayed.
857     // Else, check whether or not we should relay this payload
858 
859     bool is_dtmf_packet = (p->payload == getLocalTelephoneEventPT());
860 
861       if (relay_raw || (is_dtmf_packet && !active) ||
862 	  relay_payloads.get(p->payload)) {
863 
864       if(active){
865 	DBG("switching to relay-mode\t(ts=%u;stream=%p)\n",
866 	    p->timestamp,this);
867 	active = false;
868       }
869       handleSymmetricRtp(&p->addr,false);
870 
871       if (NULL != relay_stream &&
872 	  (!(relay_filter_dtmf && is_dtmf_packet))) {
873         relay_stream->relay(p);
874       }
875 
876       mem.freePacket(p);
877       return;
878     }
879   }
880 
881 #ifndef WITH_ZRTP
882   // throw away ZRTP packets
883   if(p->version != RTP_VERSION) {
884       mem.freePacket(p);
885       return;
886   }
887 #endif
888 
889   receive_mut.lock();
890 
891 #ifdef WITH_ZRTP
892   if (session && session->enable_zrtp) {
893 
894     if (NULL == session->zrtp_session_state.zrtp_audio) {
895       WARN("dropping received packet, as there's no ZRTP stream initialized\n");
896       receive_mut.unlock();
897       mem.freePacket(p);
898       return;
899     }
900 
901     unsigned int size = p->getBufferSize();
902     zrtp_status_t status = zrtp_process_srtp(session->zrtp_session_state.zrtp_audio, (char*)p->getBuffer(), &size);
903     switch (status)
904       {
905       case zrtp_status_ok: {
906 	p->setBufferSize(size);
907 	if (p->parse() < 0) {
908 	  ERROR("parsing decoded packet!\n");
909 	  mem.freePacket(p);
910 	} else {
911 
912           if(p->payload == getLocalTelephoneEventPT()) {
913             rtp_ev_qu.push(p);
914           } else {
915 	    if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) {
916 	      // insert failed
917 	      mem.freePacket(p);
918 	    }
919           }
920 
921 	}
922       }	break;
923 
924       case zrtp_status_drop: {
925 	//
926 	// This is a protocol ZRTP packet or masked RTP media.
927 	// In either case the packet must be dropped to protect your
928 	// media codec
929 	mem.freePacket(p);
930 
931       } break;
932 
933       case zrtp_status_fail:
934       default: {
935 	ERROR("zrtp_status_fail!\n");
936         //
937         // This is some kind of error - see logs for more information
938         //
939 	mem.freePacket(p);
940       } break;
941       }
942   } else {
943 #endif // WITH_ZRTP
944 
945     if(p->payload == getLocalTelephoneEventPT()) {
946       rtp_ev_qu.push(p);
947     } else {
948       if(!receive_buf.insert(ReceiveBuffer::value_type(p->timestamp,p)).second) {
949 	// insert failed
950 	mem.freePacket(p);
951       }
952     }
953 
954 #ifdef WITH_ZRTP
955   }
956 #endif
957   receive_mut.unlock();
958 }
959 
960 void AmRtpStream::clearRTPTimeout(struct timeval* recv_time) {
961   memcpy(&last_recv_time, recv_time, sizeof(struct timeval));
962 }
963 
964 void AmRtpStream::clearRTPTimeout() {
965   gettimeofday(&last_recv_time,NULL);
966 }
967 
968 int AmRtpStream::getDefaultPT()
969 {
970   for(PayloadCollection::iterator it = payloads.begin();
971       it != payloads.end(); ++it){
972 
973     // skip telephone-events payload
974     if(it->codec_id == CODEC_TELEPHONE_EVENT)
975       continue;
976 
977     // skip incompatible payloads
978     PayloadMappingTable::iterator pl_it = pl_map.find(it->pt);
979     if ((pl_it == pl_map.end()) || (pl_it->second.remote_pt < 0))
980       continue;
981 
982     return it->pt;
983   }
984 
985   return -1;
986 }
987 
988 int AmRtpStream::nextPacket(AmRtpPacket*& p)
989 {
990   if (!receiving && !passive)
991     return RTP_EMPTY;
992 
993   struct timeval now;
994   struct timeval diff;
995   gettimeofday(&now,NULL);
996 
997   receive_mut.lock();
998   timersub(&now,&last_recv_time,&diff);
999   if(monitor_rtp_timeout &&
1000      AmConfig::DeadRtpTime &&
1001      (diff.tv_sec > 0) &&
1002      ((unsigned int)diff.tv_sec > AmConfig::DeadRtpTime)){
1003     WARN("RTP Timeout detected. Last received packet is too old "
1004 	 "(diff.tv_sec = %i\n",(unsigned int)diff.tv_sec);
1005     receive_mut.unlock();
1006     return RTP_TIMEOUT;
1007   }
1008 
1009   if(!rtp_ev_qu.empty()) {
1010     // first return RTP telephone event payloads
1011     p = rtp_ev_qu.front();
1012     rtp_ev_qu.pop();
1013     receive_mut.unlock();
1014     return 1;
1015   }
1016 
1017   if(receive_buf.empty()){
1018     receive_mut.unlock();
1019     return RTP_EMPTY;
1020   }
1021 
1022   p = receive_buf.begin()->second;
1023   receive_buf.erase(receive_buf.begin());
1024   receive_mut.unlock();
1025 
1026   return 1;
1027 }
1028 
1029 AmRtpPacket *AmRtpStream::reuseBufferedPacket()
1030 {
1031   AmRtpPacket *p = NULL;
1032 
1033   receive_mut.lock();
1034   if(!receive_buf.empty()) {
1035     p = receive_buf.begin()->second;
1036     receive_buf.erase(receive_buf.begin());
1037   }
1038   receive_mut.unlock();
1039   return p;
1040 }
1041 
1042 void AmRtpStream::recvPacket(int fd, unsigned char* pkt, size_t len)
1043 {
1044   if(fd == l_rtcp_sd){
1045     recvRtcpPacket();
1046     return;
1047   }
1048 
1049   AmRtpPacket* p = mem.newPacket();
1050   if (!p) p = reuseBufferedPacket();
1051   if (!p) {
1052     DBG("out of buffers for RTP packets, dropping (stream [%p])\n",
1053 	this);
1054     // drop received data
1055     if (NULL != pkt) {
1056       AmRtpPacket dummy;
1057       dummy.recv(l_sd);
1058     }
1059     return;
1060   }
1061 
1062   int recv_res = 0;
1063   if (NULL != pkt) {
1064     // "receive" from buffer
1065     recv_res = p->recv(pkt, len);
1066   } else {
1067     // receive from network
1068     recv_res = p->recv(l_sd);
1069   }
1070 
1071   if(recv_res > 0){
1072     int parse_res = 0;
1073 
1074     if (logger) p->logReceived(logger, &l_saddr);
1075 
1076     gettimeofday(&p->recv_time,NULL);
1077 
1078     if(!relay_raw
1079 #ifdef WITH_ZRTP
1080        && !(session && session->enable_zrtp)
1081 #endif
1082        ) {
1083       parse_res = p->parse();
1084     }
1085 
1086     if (parse_res == -1) {
1087       DBG("error while parsing RTP packet.\n");
1088       clearRTPTimeout(&p->recv_time);
1089       mem.freePacket(p);
1090     } else {
1091       bufferPacket(p);
1092     }
1093   } else {
1094     mem.freePacket(p);
1095   }
1096 }
1097 
1098 void AmRtpStream::recvRtcpPacket()
1099 {
1100   struct sockaddr_storage recv_addr;
1101   socklen_t recv_addr_len = sizeof(recv_addr);
1102   unsigned char buffer[4096];
1103 
1104   int recved_bytes = recvfrom(l_rtcp_sd,buffer,sizeof(buffer),0,
1105 			      (struct sockaddr*)&recv_addr,
1106 			      &recv_addr_len);
1107 
1108   if(recved_bytes < 0) {
1109     if((errno != EINTR) &&
1110        (errno != EAGAIN)) {
1111       ERROR("rtcp recv(%d): %s",l_rtcp_sd,strerror(errno));
1112     }
1113     return;
1114   }
1115   else
1116     if(!recved_bytes) return;
1117 
1118   static const cstring empty;
1119   if (logger)
1120     logger->log((const char *)buffer, recved_bytes, &recv_addr, &l_rtcp_saddr, empty);
1121 
1122   // clear RTP timer
1123   clearRTPTimeout();
1124 
1125   handleSymmetricRtp(&recv_addr,true);
1126 
1127   if(!relay_enabled || !relay_stream ||
1128      !relay_stream->l_sd)
1129     return;
1130 
1131   if((size_t)recved_bytes > sizeof(buffer)) {
1132     ERROR("recved huge RTCP packet (%d)",recved_bytes);
1133     return;
1134   }
1135 
1136   struct sockaddr_storage rtcp_raddr;
1137   memcpy(&rtcp_raddr,&relay_stream->r_saddr,sizeof(rtcp_raddr));
1138   am_set_port(&rtcp_raddr, relay_stream->r_rtcp_port);
1139 
1140   // FIXME: RTCP relay RTP MUX
1141   int err;
1142   if(AmConfig::UseRawSockets) {
1143     err = raw_sender::send((char*)buffer,recved_bytes,
1144 			   AmConfig::RTP_Ifs[l_if].NetIfIdx,
1145 			   &relay_stream->l_saddr,
1146 			   &rtcp_raddr);
1147   }
1148   else {
1149     err = sendto(relay_stream->l_rtcp_sd,buffer,recved_bytes,0,
1150 		 (const struct sockaddr *)&rtcp_raddr,
1151 		 SA_len(&rtcp_raddr));
1152   }
1153 
1154   if(err < 0){
1155     ERROR("could not relay RTCP packet: %s\n",strerror(errno));
1156     return;
1157   }
1158 
1159   if (logger)
1160     logger->log((const char *)buffer, recved_bytes, &relay_stream->l_rtcp_saddr, &rtcp_raddr, empty);
1161 
1162 }
1163 
1164 void AmRtpStream::relay(AmRtpPacket* p)
1165 {
1166   // not yet initialized
1167   // or muted/on-hold
1168   if (!l_port || mute || hold)
1169     return;
1170 
1171   if(session && !session->onBeforeRTPRelay(p,&r_saddr))
1172     return;
1173 
1174   rtp_hdr_t* hdr = (rtp_hdr_t*)p->getBuffer();
1175   if (!relay_raw && !relay_transparent_seqno)
1176     hdr->seq = htons(sequence++);
1177   if (!relay_raw && !relay_transparent_ssrc)
1178     hdr->ssrc = htonl(l_ssrc);
1179   p->setAddr(&r_saddr);
1180 
1181   if(p->send(l_sd, AmConfig::RTP_Ifs[l_if].NetIfIdx, &l_saddr, rtp_mux_remote_ip, rtp_mux_remote_port) < 0){
1182     ERROR("while sending RTP packet to '%s':%i\n",
1183 	  get_addr_str(&r_saddr).c_str(),am_get_port(&r_saddr));
1184   }
1185   else {
1186     if (logger) p->logSent(logger, &l_saddr);
1187     if(session) session->onAfterRTPRelay(p,&r_saddr);
1188   }
1189 }
1190 
1191 int AmRtpStream::getLocalTelephoneEventRate()
1192 {
1193   if (local_telephone_event_pt.get())
1194     return local_telephone_event_pt->clock_rate;
1195   return 0;
1196 }
1197 
1198 int AmRtpStream::getLocalTelephoneEventPT()
1199 {
1200   if(local_telephone_event_pt.get())
1201     return local_telephone_event_pt->payload_type;
1202   return -1;
1203 }
1204 
1205 void AmRtpStream::setPayloadProvider(AmPayloadProvider* pl_prov)
1206 {
1207   payload_provider = pl_prov;
1208 }
1209 
1210 void AmRtpStream::sendDtmf(int event, unsigned int duration_ms) {
1211   dtmf_sender.queueEvent(event,duration_ms,getLocalTelephoneEventRate());
1212 }
1213 
1214 void AmRtpStream::setRelayStream(AmRtpStream* stream) {
1215   relay_stream = stream;
1216   DBG("set relay stream [%p] for RTP instance [%p]\n",
1217       stream, this);
1218 }
1219 
1220 void AmRtpStream::setRelayPayloads(const PayloadMask &_relay_payloads)
1221 {
1222   relay_payloads = _relay_payloads;
1223 }
1224 
1225 void AmRtpStream::enableRtpRelay() {
1226   DBG("enabled RTP relay for RTP stream instance [%p]\n", this);
1227   relay_enabled = true;
1228 }
1229 
1230 void AmRtpStream::disableRtpRelay() {
1231   DBG("disabled RTP relay for RTP stream instance [%p]\n", this);
1232   relay_enabled = false;
1233 }
1234 
1235 void AmRtpStream::enableRawRelay()
1236 {
1237   DBG("enabled RAW relay for RTP stream instance [%p]\n", this);
1238   relay_raw = true;
1239 }
1240 
1241 void AmRtpStream::disableRawRelay()
1242 {
1243   DBG("disabled RAW relay for RTP stream instance [%p]\n", this);
1244   relay_raw = false;
1245 }
1246 
1247 void AmRtpStream::setRtpRelayTransparentSeqno(bool transparent) {
1248   DBG("%sabled RTP relay transparent seqno for RTP stream instance [%p]\n",
1249       transparent ? "en":"dis", this);
1250   relay_transparent_seqno = transparent;
1251 }
1252 
1253 void AmRtpStream::setRtpRelayTransparentSSRC(bool transparent) {
1254   DBG("%sabled RTP relay transparent SSRC for RTP stream instance [%p]\n",
1255       transparent ? "en":"dis", this);
1256   relay_transparent_ssrc = transparent;
1257 }
1258 
1259 void AmRtpStream::setRtpRelayFilterRtpDtmf(bool filter) {
1260   DBG("%sabled RTP relay filtering of RTP DTMF (2833 / 4733) for RTP stream instance [%p]\n",
1261       filter ? "en":"dis", this);
1262   relay_filter_dtmf = filter;
1263 }
1264 
1265 void AmRtpStream::setRtpMuxRemote(const string& remote_ip, unsigned int remote_port) {
1266   rtp_mux_remote_ip = remote_ip;
1267   rtp_mux_remote_port = remote_port;
1268   if (remote_ip.empty() || !remote_port) {
1269     DBG("RTP stream [%p]: disabled RTP MUX\n", this);
1270   } else {
1271     DBG("RTP stream [%p]: set RTP MUX remote to %s:%u\n", this, remote_ip.c_str(), remote_port);
1272   }
1273 }
1274 
1275 void AmRtpStream::stopReceiving()
1276 {
1277   if (hasLocalSocket()){
1278     DBG("remove stream [%p] from RTP receiver\n", this);
1279     AmRtpReceiver::instance()->removeStream(getLocalSocket(), getLocalPort());
1280     if (l_rtcp_sd > 0) AmRtpReceiver::instance()->removeStream(l_rtcp_sd, getLocalRtcpPort());
1281   }
1282 }
1283 
1284 void AmRtpStream::resumeReceiving()
1285 {
1286   if (hasLocalSocket()){
1287     DBG("add/resume stream [%p] into RTP receiver\n",this);
1288     AmRtpReceiver::instance()->addStream(getLocalSocket(), this);
1289     if (l_rtcp_sd > 0) AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
1290   }
1291 }
1292 
1293 string AmRtpStream::getPayloadName(int payload_type)
1294 {
1295   for(PayloadCollection::iterator it = payloads.begin();
1296       it != payloads.end(); ++it){
1297 
1298     if (it->pt == payload_type) return it->name;
1299   }
1300 
1301   return string("");
1302 }
1303 
1304 PacketMem::PacketMem()
1305   : cur_idx(0), n_used(0)
1306 {
1307   memset(used, 0, sizeof(used));
1308 }
1309 
1310 inline AmRtpPacket* PacketMem::newPacket()
1311 {
1312   if(n_used >= MAX_PACKETS)
1313     return NULL; // full
1314 
1315   while(used[cur_idx])
1316     cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK;
1317 
1318   used[cur_idx] = true;
1319   n_used++;
1320 
1321   AmRtpPacket* p = &packets[cur_idx];
1322   cur_idx = (cur_idx + 1) & MAX_PACKETS_MASK;
1323 
1324   return p;
1325 }
1326 
1327 inline void PacketMem::freePacket(AmRtpPacket* p)
1328 {
1329   if (!p)  return;
1330 
1331   int idx = p-packets;
1332   assert(idx >= 0);
1333   assert(idx < MAX_PACKETS);
1334 
1335   if(!used[idx]) {
1336     ERROR("freePacket() double free: n_used = %d, idx = %d",n_used,idx);
1337     return;
1338   }
1339 
1340   used[p-packets] = false;
1341   n_used--;
1342 }
1343 
1344 inline void PacketMem::clear()
1345 {
1346   memset(used, 0, sizeof(used));
1347   n_used = cur_idx = 0;
1348 }
1349 
1350 void AmRtpStream::setLogger(msg_logger* _logger)
1351 {
1352   if (logger) dec_ref(logger);
1353   logger = _logger;
1354   if (logger) inc_ref(logger);
1355 }
1356 
1357 void AmRtpStream::debug()
1358 {
1359 #define BOOL_STR(b) ((b) ? "yes" : "no")
1360 
1361   if(hasLocalSocket() > 0) {
1362     DBG("\t<%i> <-> <%s:%i>", getLocalPort(),
1363         getRHost().c_str(), getRPort());
1364   } else {
1365     DBG("\t<unbound> <-> <%s:%i>",
1366         getRHost().c_str(), getLocalPort());
1367   }
1368 
1369   if (relay_stream) {
1370     DBG("\tinternal relay to stream %p (local port %i)",
1371       relay_stream, relay_stream->getLocalPort());
1372   }
1373   else DBG("\tno relay");
1374 
1375   DBG("\tmute: %s, hold: %s, receiving: %s",
1376       BOOL_STR(mute), BOOL_STR(hold), BOOL_STR(receiving));
1377 
1378 #undef BOOL_STR
1379 }
1380