1 #include "AmB2BMedia.h"
2 #include "AmAudio.h"
3 #include "amci/codecs.h"
4 #include <string.h>
5 #include <strings.h>
6 #include "AmB2BSession.h"
7 #include "AmRtpReceiver.h"
8 #include "sip/msg_logger.h"
9 
10 #include <algorithm>
11 #include <stdexcept>
12 
13 using namespace std;
14 
15 #define TRACE DBG
16 #define UNDEFINED_PAYLOAD (-1)
17 
18 /** class for computing payloads for relay the simpliest way - allow relaying of
19  * all payloads supported by remote party */
20 static B2BMediaStatistics b2b_stats;
21 
22 static const string zero_ip("0.0.0.0");
23 
24 static void replaceRtcpAttr(SdpMedia &m, const string& relay_address, int rtcp_port)
25 {
26   for (std::vector<SdpAttribute>::iterator a = m.attributes.begin(); a != m.attributes.end(); ++a) {
27     try {
28       if (a->attribute == "rtcp") {
29         RtcpAddress addr(a->value);
30         addr.setPort(rtcp_port);
31         if (addr.hasAddress()) addr.setAddress(relay_address);
32         a->value = addr.print();
33       }
34     }
35     catch (const exception &e) {
36       DBG("can't replace RTCP address: %s\n", e.what());
37     }
38   }
39 }
40 
41 //////////////////////////////////////////////////////////////////////////////////
42 
43 void B2BMediaStatistics::incCodecWriteUsage(const string &codec_name)
44 {
45   if (codec_name.empty()) return;
46 
47   AmLock lock(mutex);
48   map<string, int>::iterator i = codec_write_usage.find(codec_name);
49   if (i != codec_write_usage.end()) i->second++;
50   else codec_write_usage[codec_name] = 1;
B2ABEventB2ABEvent51 }
52 
53 void B2BMediaStatistics::decCodecWriteUsage(const string &codec_name)
54 {
55   if (codec_name.empty()) return;
56 
57   AmLock lock(mutex);
58   map<string, int>::iterator i = codec_write_usage.find(codec_name);
59   if (i != codec_write_usage.end()) {
60     if (i->second > 0) i->second--;
61   }
62 }
63 
64 void B2BMediaStatistics::incCodecReadUsage(const string &codec_name)
65 {
B2ABConnectAudioEventB2ABConnectAudioEvent66   if (codec_name.empty()) return;
67 
68   AmLock lock(mutex);
69   map<string, int>::iterator i = codec_read_usage.find(codec_name);
70   if (i != codec_read_usage.end()) i->second++;
71   else codec_read_usage[codec_name] = 1;
72 }
73 
B2ABConnectEarlyAudioEventB2ABConnectEarlyAudioEvent74 void B2BMediaStatistics::decCodecReadUsage(const string &codec_name)
75 {
76   if (codec_name.empty()) return;
77 
78   AmLock lock(mutex);
79   map<string, int>::iterator i = codec_read_usage.find(codec_name);
80   if (i != codec_read_usage.end()) {
81     if (i->second > 0) i->second--;
82   }
83 }
84 
85 B2BMediaStatistics *B2BMediaStatistics::instance()
86 {
87   return &b2b_stats;
88 }
89 
90 void B2BMediaStatistics::reportCodecWriteUsage(string &dst)
91 {
92   if (codec_write_usage.empty()) {
93     dst = "pcma=0"; // to be not empty
94     return;
B2ABEventB2ABConnectLegEvent95   }
96 
97   bool first = true;
98   dst.clear();
99   AmLock lock(mutex);
100   for (map<string, int>::iterator i = codec_write_usage.begin();
101       i != codec_write_usage.end(); ++i)
102   {
103     if (first) first = false;
104     else dst += ",";
105     dst += i->first;
106     dst += "=";
107     dst += int2str(i->second);
108   }
109 }
110 
B2ABConnectOtherLegExceptionEventB2ABConnectOtherLegExceptionEvent111 void B2BMediaStatistics::reportCodecReadUsage(string &dst)
112 {
113   if (codec_read_usage.empty()) {
114     dst = "pcma=0"; // to be not empty
115     return;
116   }
117 
118   bool first = true;
119   dst.clear();
120   AmLock lock(mutex);
121   for (map<string, int>::iterator i = codec_read_usage.begin();
122       i != codec_read_usage.end(); ++i)
123   {
124     if (first) first = false;
125     else dst += ",";
126     dst += i->first;
127     dst += "=";
128     dst += int2str(i->second);
129   }
130 }
131 
132 void B2BMediaStatistics::getReport(const AmArg &args, AmArg &ret)
133 {
134   AmArg write_usage;
135   AmArg read_usage;
136 
137   { // locked area
138     AmLock lock(mutex);
139 
140     for (map<string, int>::iterator i = codec_write_usage.begin();
141         i != codec_write_usage.end(); ++i)
142     {
143       AmArg avp;
144       avp["codec"] = i->first;
145       avp["count"] = i->second;
146       write_usage.push(avp);
147     }
148 
149     for (map<string, int>::iterator i = codec_read_usage.begin();
150         i != codec_read_usage.end(); ++i)
151     {
152       AmArg avp;
153       avp["codec"] = i->first;
154       avp["count"] = i->second;
155       read_usage.push(avp);
156     }
157   }
158 
159   ret["write"] = write_usage;
160   ret["read"] = read_usage;
161 }
162 
163 //////////////////////////////////////////////////////////////////////////////////
164 
165 void AudioStreamData::initialize(AmB2BSession *session)
166 {
167   stream = new AmRtpAudio(session, session->getRtpInterface());
168   stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
169   stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
170   stream->setRtpRelayFilterRtpDtmf(session->getEnableDtmfRtpFiltering());
171   if (session->getEnableDtmfRtpDetection())
172     stream->force_receive_dtmf = true;
173   force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp();
174   enable_dtmf_transcoding = session->getEnableDtmfTranscoding();
175   session->getLowFiPLs(lowfi_payloads);
176   stream->setLocalIP(session->localMediaIP());
177   stream->setRtpMuxRemote(session->getRtpMuxRemoteIP(), session->getRtpMuxRemotePort());
178 }
179 
180 AudioStreamData::AudioStreamData(AmB2BSession *session):
181   in(NULL), initialized(false),
182   force_symmetric_rtp(false), enable_dtmf_transcoding(false),
183   dtmf_detector(NULL), dtmf_queue(NULL),
184   relay_enabled(false),
185   relay_port(0),
186   relay_paused(false),
187   muted(false), receiving(true),
188   outgoing_payload(UNDEFINED_PAYLOAD), incoming_payload(UNDEFINED_PAYLOAD)
189 {
190   if (session) initialize(session);
191   else stream = NULL; // not initialized yet
192 }
193 
194 void AudioStreamData::changeSession(AmB2BSession *session)
195 {
196   if (!stream) {
197     // the stream was not created yet
198     TRACE("delayed stream initialization for session %p\n", session);
199     if (session) initialize(session);
200   }
201   else {
202     // the stream is already created
203 
204     if (session) {
205       stream->changeSession(session);
206 
207       /* FIXME: do we want to reinitialize the stream?
208       stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
209       stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
210       force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp();
211       enable_dtmf_transcoding = session->getEnableDtmfTranscoding();
212       session->getLowFiPLs(lowfi_payloads);
213       stream->setLocalIP(session->localMediaIP());
214       ...
215       }*/
216     }
217     else clear(); // free the stream and other stuff because it can't be used anyway
218   }
219 }
getCalleeStatus()220 
221 
222 void AudioStreamData::clear()
223 {
224   resetStats();
225   if (in) {
226     //in->close();
227     //delete in;
228     in = NULL;
229   }
230   if (stream) {
231     delete stream;
232     stream = NULL;
233   }
234   clearDtmfSink();
235   initialized = false;
236 }
237 
238 void AudioStreamData::stopStreamProcessing()
239 {
240   if (stream) stream->stopReceiving();
241 }
242 
243 void AudioStreamData::resumeStreamProcessing()
244 {
245   if (stream) stream->resumeReceiving();
246 }
247 
248 void AudioStreamData::setRelayStream(AmRtpStream *other)
249 {
250   if (!stream) return;
251 
252   if (relay_address.empty()) {
253     DBG("not setting relay for empty relay address\n");
254     stream->disableRtpRelay();
255     return;
256   }
257 
258   if (relay_enabled && other) {
259     stream->setRelayStream(other);
260     stream->setRelayPayloads(relay_mask);
261     if (!relay_paused)
262       stream->enableRtpRelay();
263 
264     stream->setRAddr(relay_address, relay_port, relay_port+1);
265   }
266   else {
267     // nothing to relay or other stream not set
268     stream->disableRtpRelay();
269   }
270 }
271 
272 void AudioStreamData::setRelayPayloads(const SdpMedia &m, RelayController *ctrl) {
273   ctrl->computeRelayMask(m, relay_enabled, relay_mask);
274 }
AmSessionAudioConnector()275 
276 void AudioStreamData::setRelayDestination(const string& connection_address, int port) {
277   relay_address = connection_address; relay_port = port;
278 }
279 
280 void AudioStreamData::setRelayPaused(bool paused) {
281   if (paused == relay_paused) {
282     DBG("relay already paused for stream [%p], ignoring\n", stream);
283     return;
~AmSessionAudioConnector()284   }
285 
286   relay_paused = paused;
287   DBG("relay %spaused, stream [%p]\n", relay_paused?"":"not ", stream);
288 
289   if (NULL != stream) {
290     if (relay_paused)
291       stream->disableRtpRelay();
292     else
293       stream->enableRtpRelay();
294   }
295 }
296 
297 void AudioStreamData::clearDtmfSink()
298 {
299   if (dtmf_detector) {
300     delete dtmf_detector;
301     dtmf_detector = NULL;
302   }
303   if (dtmf_queue) {
304     delete dtmf_queue;
305     dtmf_queue = NULL;
306   }
307 }
308 
309 void AudioStreamData::setDtmfSink(AmDtmfSink *dtmf_sink)
310 {
311   // TODO: optimize: clear & create the dtmf_detector only if the dtmf_sink changed
312   clearDtmfSink();
313 
314   if (dtmf_sink && stream) {
315     dtmf_detector = new AmDtmfDetector(dtmf_sink);
316     dtmf_queue = new AmDtmfEventQueue(dtmf_detector);
317     dtmf_detector->setInbandDetector(AmConfig::DefaultDTMFDetector, stream->getSampleRate());
318 
319     if(!enable_dtmf_transcoding && lowfi_payloads.size()) {
320       string selected_payload_name = stream->getPayloadName(stream->getPayloadType());
321       for(vector<SdpPayload>::iterator it = lowfi_payloads.begin();
322           it != lowfi_payloads.end(); ++it){
323         DBG("checking %s/%i PL type against %s/%i\n",
324             selected_payload_name.c_str(), stream->getPayloadType(),
325             it->encoding_name.c_str(), it->payload_type);
326         if(selected_payload_name == it->encoding_name) {
327           enable_dtmf_transcoding = true;
328           break;
329         }
330       }
331     }
332   }
333 }
334 
335 bool AudioStreamData::initStream(PlayoutType playout_type,
336     AmSdp &local_sdp, AmSdp &remote_sdp, int media_idx)
337 {
338   resetStats();
339 
340   if (!stream) {
341     initialized = false;
342     return false;
343   }
344 
345   // TODO: try to init only in case there are some payloads which can't be relayed
346   stream->forceSdpMediaIndex(media_idx);
347 
348   stream->setOnHold(false); // just hack to do correctly mute detection in stream->init
349   if (stream->init(local_sdp, remote_sdp, force_symmetric_rtp) == 0) {
350     stream->setPlayoutType(playout_type);
351     initialized = true;
352 
353 //    // do not unmute if muted because of 0.0.0.0 remote IP (the mute flag is set during init)
354 //    if (!stream->muted()) stream->setOnHold(muted);
355 
356   } else {
357     initialized = false;
358     DBG("stream initialization failed\n");
359     // there still can be payloads to be relayed (if all possible payloads are
360     // to be relayed this needs not to be an error)
361   }
362   stream->setOnHold(muted);
363   stream->setReceiving(receiving);
364 
365   return initialized;
366 }
367 
368 void AudioStreamData::sendDtmf(int event, unsigned int duration_ms)
369 {
370   if (stream) stream->sendDtmf(event,duration_ms);
371 }
372 
373 void AudioStreamData::resetStats()
374 {
375   if (outgoing_payload != UNDEFINED_PAYLOAD) {
376     b2b_stats.decCodecWriteUsage(outgoing_payload_name);
377     outgoing_payload = UNDEFINED_PAYLOAD;
378     outgoing_payload_name.clear();
379   }
380   if (incoming_payload != UNDEFINED_PAYLOAD) {
381     b2b_stats.decCodecReadUsage(incoming_payload_name);
382     incoming_payload = UNDEFINED_PAYLOAD;
383     incoming_payload_name.clear();
384   }
385 }
386 
387 void AudioStreamData::updateSendStats()
388 {
389   if (!initialized) {
390     resetStats();
391     return;
392   }
393 
394   int payload = stream->getPayloadType();
395   if (payload != outgoing_payload) {
396     // payload used to send has changed
397 
398     // decrement usage of previous payload if set
399     if (outgoing_payload != UNDEFINED_PAYLOAD)
400       b2b_stats.decCodecWriteUsage(outgoing_payload_name);
401 
402     if (payload != UNDEFINED_PAYLOAD) {
403       // remember payload name (in lowercase to simulate case insensitivity)
404       outgoing_payload_name = stream->getPayloadName(payload);
405       transform(outgoing_payload_name.begin(), outgoing_payload_name.end(),
406           outgoing_payload_name.begin(), ::tolower);
407       b2b_stats.incCodecWriteUsage(outgoing_payload_name);
408     }
409     else outgoing_payload_name.clear();
410     outgoing_payload = payload;
411   }
412 }
413 
414 void AudioStreamData::updateRecvStats(AmRtpStream *s)
415 {
416   if (!initialized) {
417     resetStats();
418     return;
419   }
420 
421   int payload = s->getLastPayload();
422   if (payload != incoming_payload) {
423     // payload used to send has changed
424 
425     // decrement usage of previous payload if set
426     if (incoming_payload != UNDEFINED_PAYLOAD)
427       b2b_stats.decCodecReadUsage(incoming_payload_name);
428 
429     if (payload != UNDEFINED_PAYLOAD) {
430       // remember payload name (in lowercase to simulate case insensitivity)
431       incoming_payload_name = stream->getPayloadName(payload);
432       transform(incoming_payload_name.begin(), incoming_payload_name.end(),
433           incoming_payload_name.begin(), ::tolower);
434       b2b_stats.incCodecReadUsage(incoming_payload_name);
435     }
436     else incoming_payload_name.clear();
437     incoming_payload = payload;
438   }
439 }
440 
441 int AudioStreamData::writeStream(unsigned long long ts, unsigned char *buffer, AudioStreamData &src)
442 {
443   if (!initialized) return 0;
444   if (stream->getOnHold()) return 0; // ignore hold streams?
445 
446   unsigned int f_size = stream->getFrameSize();
447   if (stream->sendIntReached(ts)) {
448     // A leg is ready to send data
449     int sample_rate = stream->getSampleRate();
450     int got = 0;
451     if (in) got = in->get(ts, buffer, sample_rate, f_size);
452     else {
453       if (!src.isInitialized()) return 0;
454       AmRtpAudio *src_stream = src.getStream();
455       if (src_stream->checkInterval(ts)) {
456         got = src_stream->get(ts, buffer, sample_rate, f_size);
457         if (got > 0) {
458           updateRecvStats(src_stream);
459           if (dtmf_queue && enable_dtmf_transcoding) {
460 	    dtmf_queue->putDtmfAudio(buffer, got, ts);
461 	  }
462         }
463       }
464     }
465     if (got < 0) return -1;
466     if (got > 0) {
467       // we have data to be sent
468       updateSendStats();
469       return stream->put(ts, buffer, sample_rate, got);
470     }
471   }
472   return 0;
473 }
474 
475 void AudioStreamData::mute(bool set_mute)
476 {
477   DBG("mute(%s) - RTP stream [%p]\n", set_mute?"true":"false", stream);
478 
479   if (stream) {
480     stream->setOnHold(set_mute);
481     if (muted != set_mute) stream->clearRTPTimeout();
482   }
483   muted = set_mute;
484 }
485 
486 void AudioStreamData::setReceiving(bool r) {
487   DBG("setReceiving(%s) - RTP stream [%p]\n", r?"true":"false", stream);
488   if (stream) {
489     stream->setReceiving(r);
490   }
491   receiving = r;
492 }
493 
494 //////////////////////////////////////////////////////////////////////////////////
495 
496 AmB2BMedia::RelayStreamPair::RelayStreamPair(AmB2BSession *_a, AmB2BSession *_b)
497 : a(_a, _a ? _a->getRtpInterface() : -1),
498   b(_b, _b ? _b->getRtpInterface() : -1)
499 {
500   a.enableRawRelay();
501   b.enableRawRelay();
502 }
503 
504 AmB2BMedia::AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b):
505   a(_a),
506   b(_b),
507   callgroup(AmSession::getNewId()),
508   have_a_leg_local_sdp(false), have_a_leg_remote_sdp(false),
509   have_b_leg_local_sdp(false), have_b_leg_remote_sdp(false),
510   ref_cnt(0), // everybody who wants to use must add one reference itselves
511   playout_type(ADAPTIVE_PLAYOUT),
512   //playout_type(SIMPLE_PLAYOUT),
513   a_leg_muted(false), b_leg_muted(false),
514   relay_paused(false),
515   logger(NULL)
516 {
517 }
518 
519 AmB2BMedia::~AmB2BMedia()
520 {
521   if (logger) dec_ref(logger);
522 }
523 
524 void AmB2BMedia::addToMediaProcessor() {
525   addReference(); // AmMediaProcessor's reference
526   AmMediaProcessor::instance()->addSession(this, callgroup);
527 }
528 
529 void AmB2BMedia::addToMediaProcessorUnsafe() {
530   ref_cnt++; // AmMediaProcessor's reference
531   AmMediaProcessor::instance()->addSession(this, callgroup);
532 }
533 
534 void AmB2BMedia::addReference() {
535   mutex.lock();
536   ref_cnt++;
537   mutex.unlock();
538 }
539 
540 bool AmB2BMedia::releaseReference() {
541   mutex.lock();
542   int r = --ref_cnt;
543   mutex.unlock();
544   if (r==0) {
545     DBG("last reference to AmB2BMedia [%p] cleared, destroying\n", this);
546     delete this;
547   }
548   return (r == 0);
549 }
550 
551 void AmB2BMedia::changeSession(bool a_leg, AmB2BSession *new_session)
552 {
553   AmLock lock(mutex);
554   changeSessionUnsafe(a_leg, new_session);
555 }
556 
557 void AmB2BMedia::changeSessionUnsafe(bool a_leg, AmB2BSession *new_session)
558 {
559   TRACE("changing %s leg session to %p\n", a_leg ? "A" : "B", new_session);
560   if (a_leg) a = new_session;
561   else b = new_session;
562 
563   bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding;
564 
565   // update all streams
566   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
567     // stop processing first to avoid unexpected results
568     i->a.stopStreamProcessing();
569     i->b.stopStreamProcessing();
570 
571     // replace session
572     if (a_leg) {
573       i->a.changeSession(new_session);
574     }
575     else {
576       i->b.changeSession(new_session);
577     }
578 
579     updateStreamPair(*i);
580 
581     if (i->requiresProcessing()) needs_processing = true;
582 
583     // reset logger (needed if a stream changes)
584     i->setLogger(logger);
585 
586     // return back for processing if needed
587     i->a.resumeStreamProcessing();
588     i->b.resumeStreamProcessing();
589   }
590 
591   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
592     AmRtpStream &a = (*j)->a;
593     AmRtpStream &b = (*j)->b;
594 
595     // FIXME: is stop & resume receiving needed here?
596     if (a_leg)
597       a.changeSession(new_session);
598     else
599       b.changeSession(new_session);
600   }
601 
602   if (needs_processing) {
603     if (!isProcessingMedia()) {
604       addToMediaProcessorUnsafe();
605     }
606   }
607   else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this);
608 
609   TRACE("session changed\n");
610 }
611 
612 int AmB2BMedia::writeStreams(unsigned long long ts, unsigned char *buffer)
613 {
614   int res = 0;
615   AmLock lock(mutex);
616   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
617     if (i->a.writeStream(ts, buffer, i->b) < 0) { res = -1; break; }
618     if (i->b.writeStream(ts, buffer, i->a) < 0) { res = -1; break; }
619   }
620   return res;
621 }
622 
623 void AmB2BMedia::processDtmfEvents()
624 {
625   AmLock lock(mutex);
626   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
627     i->a.processDtmfEvents();
628     i->b.processDtmfEvents();
629   }
630 
631   if (a) a->processDtmfEvents();
632   if (b) b->processDtmfEvents();
633 }
634 
635 void AmB2BMedia::sendDtmf(bool a_leg, int event, unsigned int duration_ms)
636 {
637   AmLock lock(mutex);
638   if(!audio.size())
639     return;
640 
641   // send the DTMFs using the first available stream
642   if(a_leg) {
643     audio[0].a.sendDtmf(event,duration_ms);
644   }
645   else {
646     audio[0].b.sendDtmf(event,duration_ms);
647   }
648 }
649 
650 void AmB2BMedia::clearAudio(bool a_leg)
651 {
652   TRACE("clear %s leg audio\n", a_leg ? "A" : "B");
653   AmLock lock(mutex);
654 
655   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
656     // remove streams from AmRtpReceiver first! (always both?)
657     i->a.stopStreamProcessing();
658     i->b.stopStreamProcessing();
659     if (a_leg) {
660       i->b.setRelayStream(NULL);
661       i->a.clear();
662     }
663     else {
664       i->a.setRelayStream(NULL);
665       i->b.clear();
666     }
667   }
668 
669   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
670     if ((*j)->a.hasLocalSocket())
671       AmRtpReceiver::instance()->removeStream((*j)->a.getLocalSocket(), (*j)->a.getLocalPort());
672     if ((*j)->b.hasLocalSocket())
673       AmRtpReceiver::instance()->removeStream((*j)->b.getLocalSocket(), (*j)->b.getLocalPort());
674   }
675 
676   // forget sessions to avoid using them once clearAudio is called
677   changeSessionUnsafe(a_leg, NULL);
678 
679   if (!a && !b) {
680     audio.clear(); // both legs cleared
681     for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
682       delete *j;
683     }
684     relay_streams.clear();
685   }
686 }
687 
688 void AmB2BMedia::clearRTPTimeout()
689 {
690   AmLock lock(mutex);
691 
692   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
693     i->a.clearRTPTimeout();
694     i->b.clearRTPTimeout();
695   }
696 }
697 
698 bool AmB2BMedia::canRelay(const SdpMedia &m)
699 {
700   return (m.transport == TP_RTPAVP) ||
701     (m.transport == TP_RTPSAVP) ||
702     (m.transport == TP_UDP) ||
703     (m.transport == TP_UDPTL);
704 }
705 
706 void AmB2BMedia::createStreams(const AmSdp &sdp)
707 {
708   AudioStreamIterator astreams = audio.begin();
709   RelayStreamIterator rstreams = relay_streams.begin();
710   vector<SdpMedia>::const_iterator m = sdp.media.begin();
711   int idx = 0;
712   bool create_audio = astreams == audio.end();
713   bool create_relay = rstreams == relay_streams.end();
714 
715   for (; m != sdp.media.end(); ++m, ++idx) {
716 
717     // audio streams
718     if (m->type == MT_AUDIO) {
719       if (create_audio) {
720         AudioStreamPair pair(a, b, idx);
721         pair.a.mute(a_leg_muted);
722         pair.b.mute(b_leg_muted);
723         audio.push_back(pair);
724         audio.back().setLogger(logger);
725       }
726       else if (++astreams == audio.end()) create_audio = true; // we went through the last audio stream
727     }
728 
729     // non-audio streams that we can relay
730     else if(canRelay(*m))
731     {
732       if (create_relay) {
733 	relay_streams.push_back(new RelayStreamPair(a, b));
734         relay_streams.back()->setLogger(logger);
735       }
736       else if (++rstreams == relay_streams.end()) create_relay = true; // we went through the last relay stream
737     }
738   }
739 }
740 
741 void AmB2BMedia::replaceConnectionAddress(AmSdp &parser_sdp, bool a_leg,
742 					  const string& relay_address,
743 					  const string& relay_public_address)
744 {
745   AmLock lock(mutex);
746 
747   SdpConnection orig_conn = parser_sdp.conn; // needed for the 'quick workaround' for non-audio media
748 
749   // place relay_address in connection address
750   if (!parser_sdp.conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
751     parser_sdp.conn.address = relay_public_address;
752     DBG("new connection address: %s",parser_sdp.conn.address.c_str());
753   }
754 
755   // we need to create streams if they are not already created
756   createStreams(parser_sdp);
757 
758   string replaced_ports;
759 
760   AudioStreamIterator audio_stream_it = audio.begin();
761   RelayStreamIterator relay_stream_it = relay_streams.begin();
762 
763   std::vector<SdpMedia>::iterator it = parser_sdp.media.begin();
764   for (; it != parser_sdp.media.end() ; ++it) {
765 
766     // FIXME: only UDP streams are handled for now
767     if (it->type == MT_AUDIO) {
768 
769       if( audio_stream_it == audio.end() ){
770 	// strange... we should actually have a stream for this media line...
771 	DBG("audio media line does not have coresponding audio stream...\n");
772 	continue;
773       }
774 
775       if(it->port) { // if stream active
776 	if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
777 	  it->conn.address = relay_public_address;
778 	  DBG("new stream connection address: %s",it->conn.address.c_str());
779 	}
780 	try {
781 	  if (a_leg) {
782 	    audio_stream_it->a.setLocalIP(relay_address);
783 	    it->port = audio_stream_it->a.getLocalPort();
784             replaceRtcpAttr(*it, relay_address, audio_stream_it->a.getLocalRtcpPort());
785 	  }
786 	  else {
787 	    audio_stream_it->b.setLocalIP(relay_address);
788 	    it->port = audio_stream_it->b.getLocalPort();
789             replaceRtcpAttr(*it, relay_address, audio_stream_it->b.getLocalRtcpPort());
790 	  }
791 	  if(!replaced_ports.empty()) replaced_ports += "/";
792 	  replaced_ports += int2str(it->port);
793 	} catch (const string& s) {
794 	  ERROR("setting port: '%s'\n", s.c_str());
795 	  throw string("error setting RTP port\n");
796 	}
797       }
798       ++audio_stream_it;
799     }
800     else if(canRelay(*it)) {
801 
802       if( relay_stream_it == relay_streams.end() ){
803 	// strange... we should actually have a stream for this media line...
804 	DBG("media line does not have a coresponding relay stream...\n");
805 	continue;
806       }
807 
808       if(it->port) { // if stream active
809 	if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
810 	  it->conn.address = relay_public_address;
811 	  DBG("new stream connection address: %s",it->conn.address.c_str());
812 	}
813 	try {
814 	  if (a_leg) {
815 	    if(!(*relay_stream_it)->a.hasLocalSocket()){
816 	      (*relay_stream_it)->a.setLocalIP(relay_address);
817 	    }
818 	    it->port = (*relay_stream_it)->a.getLocalPort();
819             replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->a.getLocalRtcpPort());
820 	  }
821 	  else {
822 	    if(!(*relay_stream_it)->b.hasLocalSocket()){
823 	      (*relay_stream_it)->b.setLocalIP(relay_address);
824 	    }
825 	    it->port = (*relay_stream_it)->b.getLocalPort();
826             replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->b.getLocalRtcpPort());
827 	  }
828 	  if(!replaced_ports.empty()) replaced_ports += "/";
829 	  replaced_ports += int2str(it->port);
830 	} catch (const string& s) {
831 	  ERROR("setting port: '%s'\n", s.c_str());
832 	  throw string("error setting RTP port\n");
833 	}
834       }
835       ++relay_stream_it;
836     }
837     else {
838       // quick workaround to allow direct connection of non-supported streams (i.e.
839       // those which are not relayed or transcoded): propagate connection
840       // address - might work but need not (to be tested with real clients
841       // instead of simulators)
842       if (it->conn.address.empty()) it->conn = orig_conn;
843       continue;
844     }
845   }
846 
847   if (it != parser_sdp.media.end()) {
848     // FIXME: create new streams here?
849     WARN("trying to relay SDP with more media lines than "
850 	 "relay streams initialized (%zu)\n",audio.size()+relay_streams.size());
851   }
852 
853   DBG("replaced connection address in SDP with %s:%s.\n",
854       relay_public_address.c_str(), replaced_ports.c_str());
855 }
856 
857 void AmB2BMedia::updateStreamPair(AudioStreamPair &pair)
858 {
859   bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp;
860   bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp;
861 
862   TRACE("updating stream in A leg\n");
863   pair.a.setDtmfSink(b);
864   if (pair.b.getInput()) pair.a.setRelayStream(NULL); // don't mix relayed RTP into the other's input
865   else pair.a.setRelayStream(pair.b.getStream());
866   if (have_a) pair.a.initStream(playout_type, a_leg_local_sdp, a_leg_remote_sdp, pair.media_idx);
867 
868   TRACE("updating stream in B leg\n");
869   pair.b.setDtmfSink(a);
870   if (pair.a.getInput()) pair.b.setRelayStream(NULL); // don't mix relayed RTP into the other's input
871   else pair.b.setRelayStream(pair.a.getStream());
872   if (have_b) pair.b.initStream(playout_type, b_leg_local_sdp, b_leg_remote_sdp, pair.media_idx);
873 
874   TRACE("audio streams updated\n");
875 }
876 
877 void AmB2BMedia::updateAudioStreams()
878 {
879   // SDP was updated
880   TRACE("handling SDP change, A leg: %c%c, B leg: %c%c\n",
881       have_a_leg_local_sdp ? 'X' : '-',
882       have_a_leg_remote_sdp ? 'X' : '-',
883       have_b_leg_local_sdp ? 'X' : '-',
884       have_b_leg_remote_sdp ? 'X' : '-');
885 
886   // if we have all necessary information we can initialize streams and start
887   // their processing
888   if (audio.empty() && relay_streams.empty()) return; // no streams
889 
890   bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp;
891   bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp;
892 
893   if (!(
894       (have_a || have_b)
895       )) return;
896 
897   bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding;
898 
899   // initialize streams to be able to relay & transcode (or use local audio)
900   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
901     i->a.stopStreamProcessing();
902     i->b.stopStreamProcessing();
903 
904     updateStreamPair(*i);
905 
906     if (i->requiresProcessing()) needs_processing = true;
907 
908     i->a.resumeStreamProcessing();
909     i->b.resumeStreamProcessing();
910   }
911 
912   // start media processing (only if transcoding or regular audio processing
913   // required)
914   // Note: once we send local SDP to the other party we have to expect RTP but
915   // we need to be fully initialised (both legs) before we can correctly handle
916   // the media, right?
917   if (needs_processing) {
918     if (!isProcessingMedia()) {
919       addToMediaProcessorUnsafe();
920     }
921   }
922   else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this);
923 }
924 
925 void AmB2BMedia::updateRelayStream(AmRtpStream *stream, AmB2BSession *session,
926 				   const string& connection_address,
927 				   const SdpMedia &m, AmRtpStream *relay_to)
928 {
929   static const PayloadMask true_mask(true);
930 
931   stream->stopReceiving();
932   if(m.port) {
933     if (session) {
934       // propagate session settings
935       stream->setPassiveMode(session->getRtpRelayForceSymmetricRtp());
936       stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
937       stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
938       // if (!stream->hasLocalSocket()) stream->setLocalIP(session->advertisedIP());
939     }
940     stream->setRelayStream(relay_to);
941     stream->setRelayPayloads(true_mask);
942     if (!relay_paused)
943       stream->enableRtpRelay();
944     stream->setRAddr(connection_address,m.port,m.port+1);
945     if((m.transport != TP_RTPAVP) && (m.transport != TP_RTPSAVP))
946       stream->enableRawRelay();
947     stream->resumeReceiving();
948   }
949   else {
950     DBG("disabled stream");
951   }
952 }
953 
954 void AmB2BMedia::updateStreams(bool a_leg, const AmSdp &local_sdp, const AmSdp &remote_sdp, RelayController *ctrl)
955 {
956   TRACE("%s (%c): updating streams with local & remote SDP\n",
957       a_leg ? (a ? a->getLocalTag().c_str() : "NULL") : (b ? b->getLocalTag().c_str() : "NULL"),
958       a_leg ? 'A': 'B');
959 
960   /*string s;
961   local_sdp.print(s);
962   INFO("local SDP: %s\n", s.c_str());
963   remote_sdp.print(s);
964   INFO("remote SDP: %s\n", s.c_str());*/
965 
966   AmLock lock(mutex);
967   // streams should be created already (replaceConnectionAddress called
968   // before updateLocalSdp uses/assignes their port numbers)
969 
970   // save SDP: FIXME: really needed to store instead of just to use?
971   if (a_leg) {
972     a_leg_local_sdp = local_sdp;
973     a_leg_remote_sdp = remote_sdp;
974     have_a_leg_local_sdp = true;
975     have_a_leg_remote_sdp = true;
976   }
977   else {
978     b_leg_local_sdp = local_sdp;
979     b_leg_remote_sdp = remote_sdp;
980     have_b_leg_local_sdp = true;
981     have_b_leg_remote_sdp = true;
982   }
983 
984   // create missing streams
985   createStreams(local_sdp); // FIXME: remote_sdp?
986 
987   // compute relay mask for every stream
988   // Warning: do not apply the new mask unless the offer answer succeeds?
989   // we can safely apply the changes once we have local & remote SDP (i.e. the
990   // negotiation is finished) otherwise we might handle the RTP in a wrong way
991 
992   AudioStreamIterator astream = audio.begin();
993   RelayStreamIterator rstream = relay_streams.begin();
994   int local_media_count = std::distance(local_sdp.media.begin(),
995 					local_sdp.media.end());
996   for (vector<SdpMedia>::const_iterator m = remote_sdp.media.begin(); m != remote_sdp.media.end(); ++m) {
997     if (local_media_count == 0) break;
998     const string& connection_address = (m->conn.address.empty() ? remote_sdp.conn.address : m->conn.address);
999 
1000     if (m->type == MT_AUDIO) {
1001       // initialize relay mask in the other(!) leg and relay destination for stream in current leg
1002       TRACE("relay payloads in direction %s\n", a_leg ? "B -> A" : "A -> B");
1003       if (a_leg) {
1004         astream->b.setRelayPayloads(*m, ctrl);
1005         astream->a.setRelayDestination(connection_address, m->port);
1006       }
1007       else {
1008         astream->a.setRelayPayloads(*m, ctrl);
1009         astream->b.setRelayDestination(connection_address, m->port);
1010       }
1011       ++astream;
1012     }
1013 
1014     else {
1015       if (!canRelay(*m)) continue;
1016       if (rstream == relay_streams.end()) continue;
1017 
1018       RelayStreamPair& relay_stream = **rstream;
1019 
1020       if(a_leg) {
1021 	DBG("updating A-leg relay_stream");
1022         updateRelayStream(&relay_stream.a, a, connection_address, *m, &relay_stream.b);
1023       }
1024       else {
1025 	DBG("updating B-leg relay_stream");
1026         updateRelayStream(&relay_stream.b, b, connection_address, *m, &relay_stream.a);
1027       }
1028       ++rstream;
1029     }
1030 
1031     local_media_count--;
1032   }
1033 
1034   updateAudioStreams();
1035 
1036   TRACE("streams updated with SDP\n");
1037 }
1038 
1039 void AmB2BMedia::stop(bool a_leg)
1040 {
1041   TRACE("stop %s leg\n", a_leg ? "A" : "B");
1042   clearAudio(a_leg);
1043   // remove from processor only if both A and B leg stopped
1044   if (isProcessingMedia() && (!a) && (!b)) {
1045     AmMediaProcessor::instance()->removeSession(this);
1046   }
1047 }
1048 
1049 void AmB2BMedia::onMediaProcessingTerminated()
1050 {
1051   AmMediaSession::onMediaProcessingTerminated();
1052 
1053   // release reference held by AmMediaProcessor
1054   releaseReference();
1055 }
1056 
1057 bool AmB2BMedia::replaceOffer(AmSdp &sdp, bool a_leg)
1058 {
1059   TRACE("replacing offer with a local one\n");
1060   createStreams(sdp); // create missing streams
1061 
1062   AmLock lock(mutex);
1063 
1064   try {
1065 
1066     AudioStreamIterator as = audio.begin();
1067     for (vector<SdpMedia>::iterator m = sdp.media.begin(); m != sdp.media.end(); ++m) {
1068       if (m->type == MT_AUDIO && as != audio.end()) {
1069         // generate our local offer
1070         TRACE("... making audio stream offer\n");
1071         if (a_leg) as->a.getSdpOffer(as->media_idx, *m);
1072         else as->b.getSdpOffer(as->media_idx, *m);
1073         ++as;
1074       }
1075       else {
1076         TRACE("... making non-audio/uninitialised stream inactive\n");
1077         m->send = false;
1078         m->recv = false;
1079       }
1080     }
1081 
1082   }
1083   catch (...) {
1084     TRACE("hold SDP offer creation failed\n");
1085     return true;
1086   }
1087 
1088   TRACE("hold SDP offer generated\n");
1089 
1090   return true;
1091 }
1092 
1093 void AmB2BMedia::setMuteFlag(bool a_leg, bool set)
1094 {
1095   AmLock lock(mutex);
1096   if (a_leg) a_leg_muted = set;
1097   else b_leg_muted = set;
1098   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
1099     if (a_leg) i->a.mute(set);
1100     else i->b.mute(set);
1101   }
1102 }
1103 
1104 void AmB2BMedia::setFirstStreamInput(bool a_leg, AmAudio *in)
1105 {
1106   AmLock lock(mutex);
1107   //for ( i != audio.end(); ++i) {
1108   if (!audio.empty()) {
1109     AudioStreamIterator i = audio.begin();
1110     if (a_leg) i->a.setInput(in);
1111     else i->b.setInput(in);
1112     updateAudioStreams();
1113   }
1114   else {
1115     if (in) {
1116       ERROR("BUG: can't set %s leg's first stream input, no streams\n", a_leg ? "A": "B");
1117     }
1118   }
1119 }
1120 
1121 void AmB2BMedia::createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer, bool use_zero_con)
1122 {
1123   // because of possible RTP relaying our payloads need not to match the remote
1124   // party's payloads (i.e. we might need not understand the remote party's
1125   // codecs)
1126   // As a quick hack we may use just copy of the original SDP with all streams
1127   // deactivated to avoid sending RTP to us (twinkle requires at least one
1128   // non-disabled stream in the response so we can not set all ports to 0 to
1129   // signalize that we don't want to receive anything)
1130 
1131   AmLock lock(mutex);
1132 
1133   answer = offer;
1134   answer.media.clear();
1135 
1136   if (use_zero_con) answer.conn.address = zero_ip;
1137   else {
1138     if (a_leg) { if (a) answer.conn.address = a->advertisedIP(); }
1139     else { if (b) answer.conn.address = b->advertisedIP(); }
1140 
1141     if (answer.conn.address.empty()) answer.conn.address = zero_ip; // we need something there
1142   }
1143 
1144   AudioStreamIterator i = audio.begin();
1145   vector<SdpMedia>::const_iterator m;
1146   for (m = offer.media.begin(); m != offer.media.end(); ++m) {
1147     answer.media.push_back(SdpMedia());
1148     SdpMedia &media = answer.media.back();
1149     media.type = m->type;
1150 
1151     if (media.type != MT_AUDIO) { media = *m ; media.port = 0; continue; } // copy whole media line except port
1152     if (m->port == 0) { media = *m; ++i; continue; } // copy whole inactive media line
1153 
1154     if (a_leg) i->a.getSdpAnswer(i->media_idx, *m, media);
1155     else i->b.getSdpAnswer(i->media_idx, *m, media);
1156 
1157     media.send = false; // should be already because the stream should be on hold
1158     media.recv = false; // what we would do with received data?
1159 
1160     if (media.payloads.empty()) {
1161       // we have to add something there
1162       if (!m->payloads.empty()) media.payloads.push_back(m->payloads[0]);
1163     }
1164     break;
1165   }
1166 }
1167 
1168 void AmB2BMedia::setRtpLogger(msg_logger* _logger)
1169 {
1170   if (logger) dec_ref(logger);
1171   logger = _logger;
1172   if (logger) inc_ref(logger);
1173 
1174   // walk through all the streams and use logger for them
1175   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) i->setLogger(logger);
1176   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) (*j)->setLogger(logger);
1177 }
1178 
1179 void AmB2BMedia::setRelayDTMFReceiving(bool enabled) {
1180   DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1181   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1182     DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->a);
1183     DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->b);
1184     (*j)->a.force_receive_dtmf = enabled;
1185     (*j)->b.force_receive_dtmf = enabled;
1186   }
1187 
1188   for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1189     DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->a.getStream());
1190     DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->b.getStream());
1191     if (NULL != j->a.getStream())
1192       j->a.getStream()->force_receive_dtmf = enabled;
1193 
1194     if (NULL != j->b.getStream())
1195       j->b.getStream()->force_receive_dtmf = enabled;
1196   }
1197 }
1198 
1199 /** set receving of RTP/relay streams (not receiving=drop incoming packets) */
1200 void AmB2BMedia::setReceiving(bool receiving_a, bool receiving_b) {
1201   AmLock lock(mutex); // TODO: is this necessary?
1202 
1203   DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1204   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1205     DBG("setReceiving(%s) A relay stream [%p]\n", receiving_a?"true":"false", &(*j)->a);
1206     (*j)->a.setReceiving(receiving_a);
1207     DBG("setReceiving(%s) B relay stream [%p]\n", receiving_b?"true":"false", &(*j)->b);
1208     (*j)->b.setReceiving(receiving_b);
1209   }
1210 
1211   for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1212     DBG("setReceiving(%s) A audio stream [%p]\n", receiving_a?"true":"false", j->a.getStream());
1213     j->a.setReceiving(receiving_a);
1214     DBG("setReceiving(%s) B audio stream [%p]\n", receiving_b?"true":"false", j->b.getStream());
1215     j->b.setReceiving(receiving_b);
1216   }
1217 
1218 }
1219 
1220 void AmB2BMedia::pauseRelay() {
1221   DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1222   relay_paused = true;
1223   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1224     (*j)->a.disableRawRelay();
1225     (*j)->b.disableRawRelay();
1226   }
1227 
1228   for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1229     j->a.setRelayPaused(true);
1230     j->b.setRelayPaused(true);
1231   }
1232 }
1233 
1234 void AmB2BMedia::restartRelay() {
1235   DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1236   relay_paused = false;
1237   for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1238     (*j)->a.enableRawRelay();
1239     (*j)->b.enableRawRelay();
1240   }
1241 
1242   for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1243     j->a.setRelayPaused(false);
1244     j->b.setRelayPaused(false);
1245   }
1246 }
1247 
1248 void AudioStreamData::debug()
1249 {
1250   DBG("\tmuted: %s\n", muted ? "yes" : "no");
1251   if(stream) {
1252     stream->debug();
1253   }
1254   else
1255     DBG("\t<null> <-> <null>");
1256 }
1257 
1258 // print debug info
1259 void AmB2BMedia::debug()
1260 {
1261   // walk through all the streams
1262   DBG("B2B media session %p ('%s' <-> '%s'):",
1263       this,
1264       a ? a->getLocalTag().c_str() : "?",
1265       b ? b->getLocalTag().c_str() : "?");
1266   DBG("\tOA status: %c%c / %c%c",
1267       have_a_leg_local_sdp ? 'X' : '-',
1268       have_a_leg_remote_sdp ? 'X' : '-',
1269       have_b_leg_local_sdp ? 'X' : '-',
1270       have_b_leg_remote_sdp ? 'X' : '-');
1271 
1272   for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
1273     DBG(" - audio stream (A):\n");
1274     i->a.debug();
1275     DBG(" - audio stream (B):\n");
1276     i->b.debug();
1277   }
1278 
1279   for (RelayStreamIterator j = relay_streams.begin();
1280        j != relay_streams.end(); ++j) {
1281 
1282     DBG(" - relay stream (A):\n");
1283     (*j)->a.debug();
1284     DBG(" - relay stream (B):\n");
1285     (*j)->b.debug();
1286   }
1287 }
1288