1 #include "AmB2BMedia.h"
2 #include "AmAudio.h"
3 #include "amci/codecs.h"
4 #include <string.h>
5 #include <strings.h>
6 #include "AmB2BSession.h"
7 #include "AmRtpReceiver.h"
8 #include "sip/msg_logger.h"
9
10 #include <algorithm>
11 #include <stdexcept>
12
13 using namespace std;
14
15 #define TRACE DBG
16 #define UNDEFINED_PAYLOAD (-1)
17
18 /** class for computing payloads for relay the simpliest way - allow relaying of
19 * all payloads supported by remote party */
20 static B2BMediaStatistics b2b_stats;
21
22 static const string zero_ip("0.0.0.0");
23
24 static void replaceRtcpAttr(SdpMedia &m, const string& relay_address, int rtcp_port)
25 {
26 for (std::vector<SdpAttribute>::iterator a = m.attributes.begin(); a != m.attributes.end(); ++a) {
27 try {
28 if (a->attribute == "rtcp") {
29 RtcpAddress addr(a->value);
30 addr.setPort(rtcp_port);
31 if (addr.hasAddress()) addr.setAddress(relay_address);
32 a->value = addr.print();
33 }
34 }
35 catch (const exception &e) {
36 DBG("can't replace RTCP address: %s\n", e.what());
37 }
38 }
39 }
40
41 //////////////////////////////////////////////////////////////////////////////////
42
43 void B2BMediaStatistics::incCodecWriteUsage(const string &codec_name)
44 {
45 if (codec_name.empty()) return;
46
47 AmLock lock(mutex);
48 map<string, int>::iterator i = codec_write_usage.find(codec_name);
49 if (i != codec_write_usage.end()) i->second++;
50 else codec_write_usage[codec_name] = 1;
B2ABEventB2ABEvent51 }
52
53 void B2BMediaStatistics::decCodecWriteUsage(const string &codec_name)
54 {
55 if (codec_name.empty()) return;
56
57 AmLock lock(mutex);
58 map<string, int>::iterator i = codec_write_usage.find(codec_name);
59 if (i != codec_write_usage.end()) {
60 if (i->second > 0) i->second--;
61 }
62 }
63
64 void B2BMediaStatistics::incCodecReadUsage(const string &codec_name)
65 {
B2ABConnectAudioEventB2ABConnectAudioEvent66 if (codec_name.empty()) return;
67
68 AmLock lock(mutex);
69 map<string, int>::iterator i = codec_read_usage.find(codec_name);
70 if (i != codec_read_usage.end()) i->second++;
71 else codec_read_usage[codec_name] = 1;
72 }
73
B2ABConnectEarlyAudioEventB2ABConnectEarlyAudioEvent74 void B2BMediaStatistics::decCodecReadUsage(const string &codec_name)
75 {
76 if (codec_name.empty()) return;
77
78 AmLock lock(mutex);
79 map<string, int>::iterator i = codec_read_usage.find(codec_name);
80 if (i != codec_read_usage.end()) {
81 if (i->second > 0) i->second--;
82 }
83 }
84
85 B2BMediaStatistics *B2BMediaStatistics::instance()
86 {
87 return &b2b_stats;
88 }
89
90 void B2BMediaStatistics::reportCodecWriteUsage(string &dst)
91 {
92 if (codec_write_usage.empty()) {
93 dst = "pcma=0"; // to be not empty
94 return;
B2ABEventB2ABConnectLegEvent95 }
96
97 bool first = true;
98 dst.clear();
99 AmLock lock(mutex);
100 for (map<string, int>::iterator i = codec_write_usage.begin();
101 i != codec_write_usage.end(); ++i)
102 {
103 if (first) first = false;
104 else dst += ",";
105 dst += i->first;
106 dst += "=";
107 dst += int2str(i->second);
108 }
109 }
110
B2ABConnectOtherLegExceptionEventB2ABConnectOtherLegExceptionEvent111 void B2BMediaStatistics::reportCodecReadUsage(string &dst)
112 {
113 if (codec_read_usage.empty()) {
114 dst = "pcma=0"; // to be not empty
115 return;
116 }
117
118 bool first = true;
119 dst.clear();
120 AmLock lock(mutex);
121 for (map<string, int>::iterator i = codec_read_usage.begin();
122 i != codec_read_usage.end(); ++i)
123 {
124 if (first) first = false;
125 else dst += ",";
126 dst += i->first;
127 dst += "=";
128 dst += int2str(i->second);
129 }
130 }
131
132 void B2BMediaStatistics::getReport(const AmArg &args, AmArg &ret)
133 {
134 AmArg write_usage;
135 AmArg read_usage;
136
137 { // locked area
138 AmLock lock(mutex);
139
140 for (map<string, int>::iterator i = codec_write_usage.begin();
141 i != codec_write_usage.end(); ++i)
142 {
143 AmArg avp;
144 avp["codec"] = i->first;
145 avp["count"] = i->second;
146 write_usage.push(avp);
147 }
148
149 for (map<string, int>::iterator i = codec_read_usage.begin();
150 i != codec_read_usage.end(); ++i)
151 {
152 AmArg avp;
153 avp["codec"] = i->first;
154 avp["count"] = i->second;
155 read_usage.push(avp);
156 }
157 }
158
159 ret["write"] = write_usage;
160 ret["read"] = read_usage;
161 }
162
163 //////////////////////////////////////////////////////////////////////////////////
164
165 void AudioStreamData::initialize(AmB2BSession *session)
166 {
167 stream = new AmRtpAudio(session, session->getRtpInterface());
168 stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
169 stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
170 stream->setRtpRelayFilterRtpDtmf(session->getEnableDtmfRtpFiltering());
171 if (session->getEnableDtmfRtpDetection())
172 stream->force_receive_dtmf = true;
173 force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp();
174 enable_dtmf_transcoding = session->getEnableDtmfTranscoding();
175 session->getLowFiPLs(lowfi_payloads);
176 stream->setLocalIP(session->localMediaIP());
177 stream->setRtpMuxRemote(session->getRtpMuxRemoteIP(), session->getRtpMuxRemotePort());
178 }
179
180 AudioStreamData::AudioStreamData(AmB2BSession *session):
181 in(NULL), initialized(false),
182 force_symmetric_rtp(false), enable_dtmf_transcoding(false),
183 dtmf_detector(NULL), dtmf_queue(NULL),
184 relay_enabled(false),
185 relay_port(0),
186 relay_paused(false),
187 muted(false), receiving(true),
188 outgoing_payload(UNDEFINED_PAYLOAD), incoming_payload(UNDEFINED_PAYLOAD)
189 {
190 if (session) initialize(session);
191 else stream = NULL; // not initialized yet
192 }
193
194 void AudioStreamData::changeSession(AmB2BSession *session)
195 {
196 if (!stream) {
197 // the stream was not created yet
198 TRACE("delayed stream initialization for session %p\n", session);
199 if (session) initialize(session);
200 }
201 else {
202 // the stream is already created
203
204 if (session) {
205 stream->changeSession(session);
206
207 /* FIXME: do we want to reinitialize the stream?
208 stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
209 stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
210 force_symmetric_rtp = session->getRtpRelayForceSymmetricRtp();
211 enable_dtmf_transcoding = session->getEnableDtmfTranscoding();
212 session->getLowFiPLs(lowfi_payloads);
213 stream->setLocalIP(session->localMediaIP());
214 ...
215 }*/
216 }
217 else clear(); // free the stream and other stuff because it can't be used anyway
218 }
219 }
getCalleeStatus()220
221
222 void AudioStreamData::clear()
223 {
224 resetStats();
225 if (in) {
226 //in->close();
227 //delete in;
228 in = NULL;
229 }
230 if (stream) {
231 delete stream;
232 stream = NULL;
233 }
234 clearDtmfSink();
235 initialized = false;
236 }
237
238 void AudioStreamData::stopStreamProcessing()
239 {
240 if (stream) stream->stopReceiving();
241 }
242
243 void AudioStreamData::resumeStreamProcessing()
244 {
245 if (stream) stream->resumeReceiving();
246 }
247
248 void AudioStreamData::setRelayStream(AmRtpStream *other)
249 {
250 if (!stream) return;
251
252 if (relay_address.empty()) {
253 DBG("not setting relay for empty relay address\n");
254 stream->disableRtpRelay();
255 return;
256 }
257
258 if (relay_enabled && other) {
259 stream->setRelayStream(other);
260 stream->setRelayPayloads(relay_mask);
261 if (!relay_paused)
262 stream->enableRtpRelay();
263
264 stream->setRAddr(relay_address, relay_port, relay_port+1);
265 }
266 else {
267 // nothing to relay or other stream not set
268 stream->disableRtpRelay();
269 }
270 }
271
272 void AudioStreamData::setRelayPayloads(const SdpMedia &m, RelayController *ctrl) {
273 ctrl->computeRelayMask(m, relay_enabled, relay_mask);
274 }
AmSessionAudioConnector()275
276 void AudioStreamData::setRelayDestination(const string& connection_address, int port) {
277 relay_address = connection_address; relay_port = port;
278 }
279
280 void AudioStreamData::setRelayPaused(bool paused) {
281 if (paused == relay_paused) {
282 DBG("relay already paused for stream [%p], ignoring\n", stream);
283 return;
~AmSessionAudioConnector()284 }
285
286 relay_paused = paused;
287 DBG("relay %spaused, stream [%p]\n", relay_paused?"":"not ", stream);
288
289 if (NULL != stream) {
290 if (relay_paused)
291 stream->disableRtpRelay();
292 else
293 stream->enableRtpRelay();
294 }
295 }
296
297 void AudioStreamData::clearDtmfSink()
298 {
299 if (dtmf_detector) {
300 delete dtmf_detector;
301 dtmf_detector = NULL;
302 }
303 if (dtmf_queue) {
304 delete dtmf_queue;
305 dtmf_queue = NULL;
306 }
307 }
308
309 void AudioStreamData::setDtmfSink(AmDtmfSink *dtmf_sink)
310 {
311 // TODO: optimize: clear & create the dtmf_detector only if the dtmf_sink changed
312 clearDtmfSink();
313
314 if (dtmf_sink && stream) {
315 dtmf_detector = new AmDtmfDetector(dtmf_sink);
316 dtmf_queue = new AmDtmfEventQueue(dtmf_detector);
317 dtmf_detector->setInbandDetector(AmConfig::DefaultDTMFDetector, stream->getSampleRate());
318
319 if(!enable_dtmf_transcoding && lowfi_payloads.size()) {
320 string selected_payload_name = stream->getPayloadName(stream->getPayloadType());
321 for(vector<SdpPayload>::iterator it = lowfi_payloads.begin();
322 it != lowfi_payloads.end(); ++it){
323 DBG("checking %s/%i PL type against %s/%i\n",
324 selected_payload_name.c_str(), stream->getPayloadType(),
325 it->encoding_name.c_str(), it->payload_type);
326 if(selected_payload_name == it->encoding_name) {
327 enable_dtmf_transcoding = true;
328 break;
329 }
330 }
331 }
332 }
333 }
334
335 bool AudioStreamData::initStream(PlayoutType playout_type,
336 AmSdp &local_sdp, AmSdp &remote_sdp, int media_idx)
337 {
338 resetStats();
339
340 if (!stream) {
341 initialized = false;
342 return false;
343 }
344
345 // TODO: try to init only in case there are some payloads which can't be relayed
346 stream->forceSdpMediaIndex(media_idx);
347
348 stream->setOnHold(false); // just hack to do correctly mute detection in stream->init
349 if (stream->init(local_sdp, remote_sdp, force_symmetric_rtp) == 0) {
350 stream->setPlayoutType(playout_type);
351 initialized = true;
352
353 // // do not unmute if muted because of 0.0.0.0 remote IP (the mute flag is set during init)
354 // if (!stream->muted()) stream->setOnHold(muted);
355
356 } else {
357 initialized = false;
358 DBG("stream initialization failed\n");
359 // there still can be payloads to be relayed (if all possible payloads are
360 // to be relayed this needs not to be an error)
361 }
362 stream->setOnHold(muted);
363 stream->setReceiving(receiving);
364
365 return initialized;
366 }
367
368 void AudioStreamData::sendDtmf(int event, unsigned int duration_ms)
369 {
370 if (stream) stream->sendDtmf(event,duration_ms);
371 }
372
373 void AudioStreamData::resetStats()
374 {
375 if (outgoing_payload != UNDEFINED_PAYLOAD) {
376 b2b_stats.decCodecWriteUsage(outgoing_payload_name);
377 outgoing_payload = UNDEFINED_PAYLOAD;
378 outgoing_payload_name.clear();
379 }
380 if (incoming_payload != UNDEFINED_PAYLOAD) {
381 b2b_stats.decCodecReadUsage(incoming_payload_name);
382 incoming_payload = UNDEFINED_PAYLOAD;
383 incoming_payload_name.clear();
384 }
385 }
386
387 void AudioStreamData::updateSendStats()
388 {
389 if (!initialized) {
390 resetStats();
391 return;
392 }
393
394 int payload = stream->getPayloadType();
395 if (payload != outgoing_payload) {
396 // payload used to send has changed
397
398 // decrement usage of previous payload if set
399 if (outgoing_payload != UNDEFINED_PAYLOAD)
400 b2b_stats.decCodecWriteUsage(outgoing_payload_name);
401
402 if (payload != UNDEFINED_PAYLOAD) {
403 // remember payload name (in lowercase to simulate case insensitivity)
404 outgoing_payload_name = stream->getPayloadName(payload);
405 transform(outgoing_payload_name.begin(), outgoing_payload_name.end(),
406 outgoing_payload_name.begin(), ::tolower);
407 b2b_stats.incCodecWriteUsage(outgoing_payload_name);
408 }
409 else outgoing_payload_name.clear();
410 outgoing_payload = payload;
411 }
412 }
413
414 void AudioStreamData::updateRecvStats(AmRtpStream *s)
415 {
416 if (!initialized) {
417 resetStats();
418 return;
419 }
420
421 int payload = s->getLastPayload();
422 if (payload != incoming_payload) {
423 // payload used to send has changed
424
425 // decrement usage of previous payload if set
426 if (incoming_payload != UNDEFINED_PAYLOAD)
427 b2b_stats.decCodecReadUsage(incoming_payload_name);
428
429 if (payload != UNDEFINED_PAYLOAD) {
430 // remember payload name (in lowercase to simulate case insensitivity)
431 incoming_payload_name = stream->getPayloadName(payload);
432 transform(incoming_payload_name.begin(), incoming_payload_name.end(),
433 incoming_payload_name.begin(), ::tolower);
434 b2b_stats.incCodecReadUsage(incoming_payload_name);
435 }
436 else incoming_payload_name.clear();
437 incoming_payload = payload;
438 }
439 }
440
441 int AudioStreamData::writeStream(unsigned long long ts, unsigned char *buffer, AudioStreamData &src)
442 {
443 if (!initialized) return 0;
444 if (stream->getOnHold()) return 0; // ignore hold streams?
445
446 unsigned int f_size = stream->getFrameSize();
447 if (stream->sendIntReached(ts)) {
448 // A leg is ready to send data
449 int sample_rate = stream->getSampleRate();
450 int got = 0;
451 if (in) got = in->get(ts, buffer, sample_rate, f_size);
452 else {
453 if (!src.isInitialized()) return 0;
454 AmRtpAudio *src_stream = src.getStream();
455 if (src_stream->checkInterval(ts)) {
456 got = src_stream->get(ts, buffer, sample_rate, f_size);
457 if (got > 0) {
458 updateRecvStats(src_stream);
459 if (dtmf_queue && enable_dtmf_transcoding) {
460 dtmf_queue->putDtmfAudio(buffer, got, ts);
461 }
462 }
463 }
464 }
465 if (got < 0) return -1;
466 if (got > 0) {
467 // we have data to be sent
468 updateSendStats();
469 return stream->put(ts, buffer, sample_rate, got);
470 }
471 }
472 return 0;
473 }
474
475 void AudioStreamData::mute(bool set_mute)
476 {
477 DBG("mute(%s) - RTP stream [%p]\n", set_mute?"true":"false", stream);
478
479 if (stream) {
480 stream->setOnHold(set_mute);
481 if (muted != set_mute) stream->clearRTPTimeout();
482 }
483 muted = set_mute;
484 }
485
486 void AudioStreamData::setReceiving(bool r) {
487 DBG("setReceiving(%s) - RTP stream [%p]\n", r?"true":"false", stream);
488 if (stream) {
489 stream->setReceiving(r);
490 }
491 receiving = r;
492 }
493
494 //////////////////////////////////////////////////////////////////////////////////
495
496 AmB2BMedia::RelayStreamPair::RelayStreamPair(AmB2BSession *_a, AmB2BSession *_b)
497 : a(_a, _a ? _a->getRtpInterface() : -1),
498 b(_b, _b ? _b->getRtpInterface() : -1)
499 {
500 a.enableRawRelay();
501 b.enableRawRelay();
502 }
503
504 AmB2BMedia::AmB2BMedia(AmB2BSession *_a, AmB2BSession *_b):
505 a(_a),
506 b(_b),
507 callgroup(AmSession::getNewId()),
508 have_a_leg_local_sdp(false), have_a_leg_remote_sdp(false),
509 have_b_leg_local_sdp(false), have_b_leg_remote_sdp(false),
510 ref_cnt(0), // everybody who wants to use must add one reference itselves
511 playout_type(ADAPTIVE_PLAYOUT),
512 //playout_type(SIMPLE_PLAYOUT),
513 a_leg_muted(false), b_leg_muted(false),
514 relay_paused(false),
515 logger(NULL)
516 {
517 }
518
519 AmB2BMedia::~AmB2BMedia()
520 {
521 if (logger) dec_ref(logger);
522 }
523
524 void AmB2BMedia::addToMediaProcessor() {
525 addReference(); // AmMediaProcessor's reference
526 AmMediaProcessor::instance()->addSession(this, callgroup);
527 }
528
529 void AmB2BMedia::addToMediaProcessorUnsafe() {
530 ref_cnt++; // AmMediaProcessor's reference
531 AmMediaProcessor::instance()->addSession(this, callgroup);
532 }
533
534 void AmB2BMedia::addReference() {
535 mutex.lock();
536 ref_cnt++;
537 mutex.unlock();
538 }
539
540 bool AmB2BMedia::releaseReference() {
541 mutex.lock();
542 int r = --ref_cnt;
543 mutex.unlock();
544 if (r==0) {
545 DBG("last reference to AmB2BMedia [%p] cleared, destroying\n", this);
546 delete this;
547 }
548 return (r == 0);
549 }
550
551 void AmB2BMedia::changeSession(bool a_leg, AmB2BSession *new_session)
552 {
553 AmLock lock(mutex);
554 changeSessionUnsafe(a_leg, new_session);
555 }
556
557 void AmB2BMedia::changeSessionUnsafe(bool a_leg, AmB2BSession *new_session)
558 {
559 TRACE("changing %s leg session to %p\n", a_leg ? "A" : "B", new_session);
560 if (a_leg) a = new_session;
561 else b = new_session;
562
563 bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding;
564
565 // update all streams
566 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
567 // stop processing first to avoid unexpected results
568 i->a.stopStreamProcessing();
569 i->b.stopStreamProcessing();
570
571 // replace session
572 if (a_leg) {
573 i->a.changeSession(new_session);
574 }
575 else {
576 i->b.changeSession(new_session);
577 }
578
579 updateStreamPair(*i);
580
581 if (i->requiresProcessing()) needs_processing = true;
582
583 // reset logger (needed if a stream changes)
584 i->setLogger(logger);
585
586 // return back for processing if needed
587 i->a.resumeStreamProcessing();
588 i->b.resumeStreamProcessing();
589 }
590
591 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
592 AmRtpStream &a = (*j)->a;
593 AmRtpStream &b = (*j)->b;
594
595 // FIXME: is stop & resume receiving needed here?
596 if (a_leg)
597 a.changeSession(new_session);
598 else
599 b.changeSession(new_session);
600 }
601
602 if (needs_processing) {
603 if (!isProcessingMedia()) {
604 addToMediaProcessorUnsafe();
605 }
606 }
607 else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this);
608
609 TRACE("session changed\n");
610 }
611
612 int AmB2BMedia::writeStreams(unsigned long long ts, unsigned char *buffer)
613 {
614 int res = 0;
615 AmLock lock(mutex);
616 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
617 if (i->a.writeStream(ts, buffer, i->b) < 0) { res = -1; break; }
618 if (i->b.writeStream(ts, buffer, i->a) < 0) { res = -1; break; }
619 }
620 return res;
621 }
622
623 void AmB2BMedia::processDtmfEvents()
624 {
625 AmLock lock(mutex);
626 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
627 i->a.processDtmfEvents();
628 i->b.processDtmfEvents();
629 }
630
631 if (a) a->processDtmfEvents();
632 if (b) b->processDtmfEvents();
633 }
634
635 void AmB2BMedia::sendDtmf(bool a_leg, int event, unsigned int duration_ms)
636 {
637 AmLock lock(mutex);
638 if(!audio.size())
639 return;
640
641 // send the DTMFs using the first available stream
642 if(a_leg) {
643 audio[0].a.sendDtmf(event,duration_ms);
644 }
645 else {
646 audio[0].b.sendDtmf(event,duration_ms);
647 }
648 }
649
650 void AmB2BMedia::clearAudio(bool a_leg)
651 {
652 TRACE("clear %s leg audio\n", a_leg ? "A" : "B");
653 AmLock lock(mutex);
654
655 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
656 // remove streams from AmRtpReceiver first! (always both?)
657 i->a.stopStreamProcessing();
658 i->b.stopStreamProcessing();
659 if (a_leg) {
660 i->b.setRelayStream(NULL);
661 i->a.clear();
662 }
663 else {
664 i->a.setRelayStream(NULL);
665 i->b.clear();
666 }
667 }
668
669 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
670 if ((*j)->a.hasLocalSocket())
671 AmRtpReceiver::instance()->removeStream((*j)->a.getLocalSocket(), (*j)->a.getLocalPort());
672 if ((*j)->b.hasLocalSocket())
673 AmRtpReceiver::instance()->removeStream((*j)->b.getLocalSocket(), (*j)->b.getLocalPort());
674 }
675
676 // forget sessions to avoid using them once clearAudio is called
677 changeSessionUnsafe(a_leg, NULL);
678
679 if (!a && !b) {
680 audio.clear(); // both legs cleared
681 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) {
682 delete *j;
683 }
684 relay_streams.clear();
685 }
686 }
687
688 void AmB2BMedia::clearRTPTimeout()
689 {
690 AmLock lock(mutex);
691
692 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
693 i->a.clearRTPTimeout();
694 i->b.clearRTPTimeout();
695 }
696 }
697
698 bool AmB2BMedia::canRelay(const SdpMedia &m)
699 {
700 return (m.transport == TP_RTPAVP) ||
701 (m.transport == TP_RTPSAVP) ||
702 (m.transport == TP_UDP) ||
703 (m.transport == TP_UDPTL);
704 }
705
706 void AmB2BMedia::createStreams(const AmSdp &sdp)
707 {
708 AudioStreamIterator astreams = audio.begin();
709 RelayStreamIterator rstreams = relay_streams.begin();
710 vector<SdpMedia>::const_iterator m = sdp.media.begin();
711 int idx = 0;
712 bool create_audio = astreams == audio.end();
713 bool create_relay = rstreams == relay_streams.end();
714
715 for (; m != sdp.media.end(); ++m, ++idx) {
716
717 // audio streams
718 if (m->type == MT_AUDIO) {
719 if (create_audio) {
720 AudioStreamPair pair(a, b, idx);
721 pair.a.mute(a_leg_muted);
722 pair.b.mute(b_leg_muted);
723 audio.push_back(pair);
724 audio.back().setLogger(logger);
725 }
726 else if (++astreams == audio.end()) create_audio = true; // we went through the last audio stream
727 }
728
729 // non-audio streams that we can relay
730 else if(canRelay(*m))
731 {
732 if (create_relay) {
733 relay_streams.push_back(new RelayStreamPair(a, b));
734 relay_streams.back()->setLogger(logger);
735 }
736 else if (++rstreams == relay_streams.end()) create_relay = true; // we went through the last relay stream
737 }
738 }
739 }
740
741 void AmB2BMedia::replaceConnectionAddress(AmSdp &parser_sdp, bool a_leg,
742 const string& relay_address,
743 const string& relay_public_address)
744 {
745 AmLock lock(mutex);
746
747 SdpConnection orig_conn = parser_sdp.conn; // needed for the 'quick workaround' for non-audio media
748
749 // place relay_address in connection address
750 if (!parser_sdp.conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
751 parser_sdp.conn.address = relay_public_address;
752 DBG("new connection address: %s",parser_sdp.conn.address.c_str());
753 }
754
755 // we need to create streams if they are not already created
756 createStreams(parser_sdp);
757
758 string replaced_ports;
759
760 AudioStreamIterator audio_stream_it = audio.begin();
761 RelayStreamIterator relay_stream_it = relay_streams.begin();
762
763 std::vector<SdpMedia>::iterator it = parser_sdp.media.begin();
764 for (; it != parser_sdp.media.end() ; ++it) {
765
766 // FIXME: only UDP streams are handled for now
767 if (it->type == MT_AUDIO) {
768
769 if( audio_stream_it == audio.end() ){
770 // strange... we should actually have a stream for this media line...
771 DBG("audio media line does not have coresponding audio stream...\n");
772 continue;
773 }
774
775 if(it->port) { // if stream active
776 if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
777 it->conn.address = relay_public_address;
778 DBG("new stream connection address: %s",it->conn.address.c_str());
779 }
780 try {
781 if (a_leg) {
782 audio_stream_it->a.setLocalIP(relay_address);
783 it->port = audio_stream_it->a.getLocalPort();
784 replaceRtcpAttr(*it, relay_address, audio_stream_it->a.getLocalRtcpPort());
785 }
786 else {
787 audio_stream_it->b.setLocalIP(relay_address);
788 it->port = audio_stream_it->b.getLocalPort();
789 replaceRtcpAttr(*it, relay_address, audio_stream_it->b.getLocalRtcpPort());
790 }
791 if(!replaced_ports.empty()) replaced_ports += "/";
792 replaced_ports += int2str(it->port);
793 } catch (const string& s) {
794 ERROR("setting port: '%s'\n", s.c_str());
795 throw string("error setting RTP port\n");
796 }
797 }
798 ++audio_stream_it;
799 }
800 else if(canRelay(*it)) {
801
802 if( relay_stream_it == relay_streams.end() ){
803 // strange... we should actually have a stream for this media line...
804 DBG("media line does not have a coresponding relay stream...\n");
805 continue;
806 }
807
808 if(it->port) { // if stream active
809 if (!it->conn.address.empty() && (parser_sdp.conn.address != zero_ip)) {
810 it->conn.address = relay_public_address;
811 DBG("new stream connection address: %s",it->conn.address.c_str());
812 }
813 try {
814 if (a_leg) {
815 if(!(*relay_stream_it)->a.hasLocalSocket()){
816 (*relay_stream_it)->a.setLocalIP(relay_address);
817 }
818 it->port = (*relay_stream_it)->a.getLocalPort();
819 replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->a.getLocalRtcpPort());
820 }
821 else {
822 if(!(*relay_stream_it)->b.hasLocalSocket()){
823 (*relay_stream_it)->b.setLocalIP(relay_address);
824 }
825 it->port = (*relay_stream_it)->b.getLocalPort();
826 replaceRtcpAttr(*it, relay_address, (*relay_stream_it)->b.getLocalRtcpPort());
827 }
828 if(!replaced_ports.empty()) replaced_ports += "/";
829 replaced_ports += int2str(it->port);
830 } catch (const string& s) {
831 ERROR("setting port: '%s'\n", s.c_str());
832 throw string("error setting RTP port\n");
833 }
834 }
835 ++relay_stream_it;
836 }
837 else {
838 // quick workaround to allow direct connection of non-supported streams (i.e.
839 // those which are not relayed or transcoded): propagate connection
840 // address - might work but need not (to be tested with real clients
841 // instead of simulators)
842 if (it->conn.address.empty()) it->conn = orig_conn;
843 continue;
844 }
845 }
846
847 if (it != parser_sdp.media.end()) {
848 // FIXME: create new streams here?
849 WARN("trying to relay SDP with more media lines than "
850 "relay streams initialized (%zu)\n",audio.size()+relay_streams.size());
851 }
852
853 DBG("replaced connection address in SDP with %s:%s.\n",
854 relay_public_address.c_str(), replaced_ports.c_str());
855 }
856
857 void AmB2BMedia::updateStreamPair(AudioStreamPair &pair)
858 {
859 bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp;
860 bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp;
861
862 TRACE("updating stream in A leg\n");
863 pair.a.setDtmfSink(b);
864 if (pair.b.getInput()) pair.a.setRelayStream(NULL); // don't mix relayed RTP into the other's input
865 else pair.a.setRelayStream(pair.b.getStream());
866 if (have_a) pair.a.initStream(playout_type, a_leg_local_sdp, a_leg_remote_sdp, pair.media_idx);
867
868 TRACE("updating stream in B leg\n");
869 pair.b.setDtmfSink(a);
870 if (pair.a.getInput()) pair.b.setRelayStream(NULL); // don't mix relayed RTP into the other's input
871 else pair.b.setRelayStream(pair.a.getStream());
872 if (have_b) pair.b.initStream(playout_type, b_leg_local_sdp, b_leg_remote_sdp, pair.media_idx);
873
874 TRACE("audio streams updated\n");
875 }
876
877 void AmB2BMedia::updateAudioStreams()
878 {
879 // SDP was updated
880 TRACE("handling SDP change, A leg: %c%c, B leg: %c%c\n",
881 have_a_leg_local_sdp ? 'X' : '-',
882 have_a_leg_remote_sdp ? 'X' : '-',
883 have_b_leg_local_sdp ? 'X' : '-',
884 have_b_leg_remote_sdp ? 'X' : '-');
885
886 // if we have all necessary information we can initialize streams and start
887 // their processing
888 if (audio.empty() && relay_streams.empty()) return; // no streams
889
890 bool have_a = have_a_leg_local_sdp && have_a_leg_remote_sdp;
891 bool have_b = have_b_leg_local_sdp && have_b_leg_remote_sdp;
892
893 if (!(
894 (have_a || have_b)
895 )) return;
896
897 bool needs_processing = a && b && a->getRtpRelayMode() == AmB2BSession::RTP_Transcoding;
898
899 // initialize streams to be able to relay & transcode (or use local audio)
900 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
901 i->a.stopStreamProcessing();
902 i->b.stopStreamProcessing();
903
904 updateStreamPair(*i);
905
906 if (i->requiresProcessing()) needs_processing = true;
907
908 i->a.resumeStreamProcessing();
909 i->b.resumeStreamProcessing();
910 }
911
912 // start media processing (only if transcoding or regular audio processing
913 // required)
914 // Note: once we send local SDP to the other party we have to expect RTP but
915 // we need to be fully initialised (both legs) before we can correctly handle
916 // the media, right?
917 if (needs_processing) {
918 if (!isProcessingMedia()) {
919 addToMediaProcessorUnsafe();
920 }
921 }
922 else if (isProcessingMedia()) AmMediaProcessor::instance()->removeSession(this);
923 }
924
925 void AmB2BMedia::updateRelayStream(AmRtpStream *stream, AmB2BSession *session,
926 const string& connection_address,
927 const SdpMedia &m, AmRtpStream *relay_to)
928 {
929 static const PayloadMask true_mask(true);
930
931 stream->stopReceiving();
932 if(m.port) {
933 if (session) {
934 // propagate session settings
935 stream->setPassiveMode(session->getRtpRelayForceSymmetricRtp());
936 stream->setRtpRelayTransparentSeqno(session->getRtpRelayTransparentSeqno());
937 stream->setRtpRelayTransparentSSRC(session->getRtpRelayTransparentSSRC());
938 // if (!stream->hasLocalSocket()) stream->setLocalIP(session->advertisedIP());
939 }
940 stream->setRelayStream(relay_to);
941 stream->setRelayPayloads(true_mask);
942 if (!relay_paused)
943 stream->enableRtpRelay();
944 stream->setRAddr(connection_address,m.port,m.port+1);
945 if((m.transport != TP_RTPAVP) && (m.transport != TP_RTPSAVP))
946 stream->enableRawRelay();
947 stream->resumeReceiving();
948 }
949 else {
950 DBG("disabled stream");
951 }
952 }
953
954 void AmB2BMedia::updateStreams(bool a_leg, const AmSdp &local_sdp, const AmSdp &remote_sdp, RelayController *ctrl)
955 {
956 TRACE("%s (%c): updating streams with local & remote SDP\n",
957 a_leg ? (a ? a->getLocalTag().c_str() : "NULL") : (b ? b->getLocalTag().c_str() : "NULL"),
958 a_leg ? 'A': 'B');
959
960 /*string s;
961 local_sdp.print(s);
962 INFO("local SDP: %s\n", s.c_str());
963 remote_sdp.print(s);
964 INFO("remote SDP: %s\n", s.c_str());*/
965
966 AmLock lock(mutex);
967 // streams should be created already (replaceConnectionAddress called
968 // before updateLocalSdp uses/assignes their port numbers)
969
970 // save SDP: FIXME: really needed to store instead of just to use?
971 if (a_leg) {
972 a_leg_local_sdp = local_sdp;
973 a_leg_remote_sdp = remote_sdp;
974 have_a_leg_local_sdp = true;
975 have_a_leg_remote_sdp = true;
976 }
977 else {
978 b_leg_local_sdp = local_sdp;
979 b_leg_remote_sdp = remote_sdp;
980 have_b_leg_local_sdp = true;
981 have_b_leg_remote_sdp = true;
982 }
983
984 // create missing streams
985 createStreams(local_sdp); // FIXME: remote_sdp?
986
987 // compute relay mask for every stream
988 // Warning: do not apply the new mask unless the offer answer succeeds?
989 // we can safely apply the changes once we have local & remote SDP (i.e. the
990 // negotiation is finished) otherwise we might handle the RTP in a wrong way
991
992 AudioStreamIterator astream = audio.begin();
993 RelayStreamIterator rstream = relay_streams.begin();
994 int local_media_count = std::distance(local_sdp.media.begin(),
995 local_sdp.media.end());
996 for (vector<SdpMedia>::const_iterator m = remote_sdp.media.begin(); m != remote_sdp.media.end(); ++m) {
997 if (local_media_count == 0) break;
998 const string& connection_address = (m->conn.address.empty() ? remote_sdp.conn.address : m->conn.address);
999
1000 if (m->type == MT_AUDIO) {
1001 // initialize relay mask in the other(!) leg and relay destination for stream in current leg
1002 TRACE("relay payloads in direction %s\n", a_leg ? "B -> A" : "A -> B");
1003 if (a_leg) {
1004 astream->b.setRelayPayloads(*m, ctrl);
1005 astream->a.setRelayDestination(connection_address, m->port);
1006 }
1007 else {
1008 astream->a.setRelayPayloads(*m, ctrl);
1009 astream->b.setRelayDestination(connection_address, m->port);
1010 }
1011 ++astream;
1012 }
1013
1014 else {
1015 if (!canRelay(*m)) continue;
1016 if (rstream == relay_streams.end()) continue;
1017
1018 RelayStreamPair& relay_stream = **rstream;
1019
1020 if(a_leg) {
1021 DBG("updating A-leg relay_stream");
1022 updateRelayStream(&relay_stream.a, a, connection_address, *m, &relay_stream.b);
1023 }
1024 else {
1025 DBG("updating B-leg relay_stream");
1026 updateRelayStream(&relay_stream.b, b, connection_address, *m, &relay_stream.a);
1027 }
1028 ++rstream;
1029 }
1030
1031 local_media_count--;
1032 }
1033
1034 updateAudioStreams();
1035
1036 TRACE("streams updated with SDP\n");
1037 }
1038
1039 void AmB2BMedia::stop(bool a_leg)
1040 {
1041 TRACE("stop %s leg\n", a_leg ? "A" : "B");
1042 clearAudio(a_leg);
1043 // remove from processor only if both A and B leg stopped
1044 if (isProcessingMedia() && (!a) && (!b)) {
1045 AmMediaProcessor::instance()->removeSession(this);
1046 }
1047 }
1048
1049 void AmB2BMedia::onMediaProcessingTerminated()
1050 {
1051 AmMediaSession::onMediaProcessingTerminated();
1052
1053 // release reference held by AmMediaProcessor
1054 releaseReference();
1055 }
1056
1057 bool AmB2BMedia::replaceOffer(AmSdp &sdp, bool a_leg)
1058 {
1059 TRACE("replacing offer with a local one\n");
1060 createStreams(sdp); // create missing streams
1061
1062 AmLock lock(mutex);
1063
1064 try {
1065
1066 AudioStreamIterator as = audio.begin();
1067 for (vector<SdpMedia>::iterator m = sdp.media.begin(); m != sdp.media.end(); ++m) {
1068 if (m->type == MT_AUDIO && as != audio.end()) {
1069 // generate our local offer
1070 TRACE("... making audio stream offer\n");
1071 if (a_leg) as->a.getSdpOffer(as->media_idx, *m);
1072 else as->b.getSdpOffer(as->media_idx, *m);
1073 ++as;
1074 }
1075 else {
1076 TRACE("... making non-audio/uninitialised stream inactive\n");
1077 m->send = false;
1078 m->recv = false;
1079 }
1080 }
1081
1082 }
1083 catch (...) {
1084 TRACE("hold SDP offer creation failed\n");
1085 return true;
1086 }
1087
1088 TRACE("hold SDP offer generated\n");
1089
1090 return true;
1091 }
1092
1093 void AmB2BMedia::setMuteFlag(bool a_leg, bool set)
1094 {
1095 AmLock lock(mutex);
1096 if (a_leg) a_leg_muted = set;
1097 else b_leg_muted = set;
1098 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
1099 if (a_leg) i->a.mute(set);
1100 else i->b.mute(set);
1101 }
1102 }
1103
1104 void AmB2BMedia::setFirstStreamInput(bool a_leg, AmAudio *in)
1105 {
1106 AmLock lock(mutex);
1107 //for ( i != audio.end(); ++i) {
1108 if (!audio.empty()) {
1109 AudioStreamIterator i = audio.begin();
1110 if (a_leg) i->a.setInput(in);
1111 else i->b.setInput(in);
1112 updateAudioStreams();
1113 }
1114 else {
1115 if (in) {
1116 ERROR("BUG: can't set %s leg's first stream input, no streams\n", a_leg ? "A": "B");
1117 }
1118 }
1119 }
1120
1121 void AmB2BMedia::createHoldAnswer(bool a_leg, const AmSdp &offer, AmSdp &answer, bool use_zero_con)
1122 {
1123 // because of possible RTP relaying our payloads need not to match the remote
1124 // party's payloads (i.e. we might need not understand the remote party's
1125 // codecs)
1126 // As a quick hack we may use just copy of the original SDP with all streams
1127 // deactivated to avoid sending RTP to us (twinkle requires at least one
1128 // non-disabled stream in the response so we can not set all ports to 0 to
1129 // signalize that we don't want to receive anything)
1130
1131 AmLock lock(mutex);
1132
1133 answer = offer;
1134 answer.media.clear();
1135
1136 if (use_zero_con) answer.conn.address = zero_ip;
1137 else {
1138 if (a_leg) { if (a) answer.conn.address = a->advertisedIP(); }
1139 else { if (b) answer.conn.address = b->advertisedIP(); }
1140
1141 if (answer.conn.address.empty()) answer.conn.address = zero_ip; // we need something there
1142 }
1143
1144 AudioStreamIterator i = audio.begin();
1145 vector<SdpMedia>::const_iterator m;
1146 for (m = offer.media.begin(); m != offer.media.end(); ++m) {
1147 answer.media.push_back(SdpMedia());
1148 SdpMedia &media = answer.media.back();
1149 media.type = m->type;
1150
1151 if (media.type != MT_AUDIO) { media = *m ; media.port = 0; continue; } // copy whole media line except port
1152 if (m->port == 0) { media = *m; ++i; continue; } // copy whole inactive media line
1153
1154 if (a_leg) i->a.getSdpAnswer(i->media_idx, *m, media);
1155 else i->b.getSdpAnswer(i->media_idx, *m, media);
1156
1157 media.send = false; // should be already because the stream should be on hold
1158 media.recv = false; // what we would do with received data?
1159
1160 if (media.payloads.empty()) {
1161 // we have to add something there
1162 if (!m->payloads.empty()) media.payloads.push_back(m->payloads[0]);
1163 }
1164 break;
1165 }
1166 }
1167
1168 void AmB2BMedia::setRtpLogger(msg_logger* _logger)
1169 {
1170 if (logger) dec_ref(logger);
1171 logger = _logger;
1172 if (logger) inc_ref(logger);
1173
1174 // walk through all the streams and use logger for them
1175 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) i->setLogger(logger);
1176 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); ++j) (*j)->setLogger(logger);
1177 }
1178
1179 void AmB2BMedia::setRelayDTMFReceiving(bool enabled) {
1180 DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1181 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1182 DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->a);
1183 DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", &(*j)->b);
1184 (*j)->a.force_receive_dtmf = enabled;
1185 (*j)->b.force_receive_dtmf = enabled;
1186 }
1187
1188 for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1189 DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->a.getStream());
1190 DBG("force_receive_dtmf %sabled for [%p]\n", enabled?"en":"dis", j->b.getStream());
1191 if (NULL != j->a.getStream())
1192 j->a.getStream()->force_receive_dtmf = enabled;
1193
1194 if (NULL != j->b.getStream())
1195 j->b.getStream()->force_receive_dtmf = enabled;
1196 }
1197 }
1198
1199 /** set receving of RTP/relay streams (not receiving=drop incoming packets) */
1200 void AmB2BMedia::setReceiving(bool receiving_a, bool receiving_b) {
1201 AmLock lock(mutex); // TODO: is this necessary?
1202
1203 DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1204 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1205 DBG("setReceiving(%s) A relay stream [%p]\n", receiving_a?"true":"false", &(*j)->a);
1206 (*j)->a.setReceiving(receiving_a);
1207 DBG("setReceiving(%s) B relay stream [%p]\n", receiving_b?"true":"false", &(*j)->b);
1208 (*j)->b.setReceiving(receiving_b);
1209 }
1210
1211 for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1212 DBG("setReceiving(%s) A audio stream [%p]\n", receiving_a?"true":"false", j->a.getStream());
1213 j->a.setReceiving(receiving_a);
1214 DBG("setReceiving(%s) B audio stream [%p]\n", receiving_b?"true":"false", j->b.getStream());
1215 j->b.setReceiving(receiving_b);
1216 }
1217
1218 }
1219
1220 void AmB2BMedia::pauseRelay() {
1221 DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1222 relay_paused = true;
1223 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1224 (*j)->a.disableRawRelay();
1225 (*j)->b.disableRawRelay();
1226 }
1227
1228 for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1229 j->a.setRelayPaused(true);
1230 j->b.setRelayPaused(true);
1231 }
1232 }
1233
1234 void AmB2BMedia::restartRelay() {
1235 DBG("relay_streams.size() = %zd, audio_streams.size() = %zd\n", relay_streams.size(), audio.size());
1236 relay_paused = false;
1237 for (RelayStreamIterator j = relay_streams.begin(); j != relay_streams.end(); j++) {
1238 (*j)->a.enableRawRelay();
1239 (*j)->b.enableRawRelay();
1240 }
1241
1242 for (AudioStreamIterator j = audio.begin(); j != audio.end(); j++) {
1243 j->a.setRelayPaused(false);
1244 j->b.setRelayPaused(false);
1245 }
1246 }
1247
1248 void AudioStreamData::debug()
1249 {
1250 DBG("\tmuted: %s\n", muted ? "yes" : "no");
1251 if(stream) {
1252 stream->debug();
1253 }
1254 else
1255 DBG("\t<null> <-> <null>");
1256 }
1257
1258 // print debug info
1259 void AmB2BMedia::debug()
1260 {
1261 // walk through all the streams
1262 DBG("B2B media session %p ('%s' <-> '%s'):",
1263 this,
1264 a ? a->getLocalTag().c_str() : "?",
1265 b ? b->getLocalTag().c_str() : "?");
1266 DBG("\tOA status: %c%c / %c%c",
1267 have_a_leg_local_sdp ? 'X' : '-',
1268 have_a_leg_remote_sdp ? 'X' : '-',
1269 have_b_leg_local_sdp ? 'X' : '-',
1270 have_b_leg_remote_sdp ? 'X' : '-');
1271
1272 for (AudioStreamIterator i = audio.begin(); i != audio.end(); ++i) {
1273 DBG(" - audio stream (A):\n");
1274 i->a.debug();
1275 DBG(" - audio stream (B):\n");
1276 i->b.debug();
1277 }
1278
1279 for (RelayStreamIterator j = relay_streams.begin();
1280 j != relay_streams.end(); ++j) {
1281
1282 DBG(" - relay stream (A):\n");
1283 (*j)->a.debug();
1284 DBG(" - relay stream (B):\n");
1285 (*j)->b.debug();
1286 }
1287 }
1288