1 /**
2  * yrtpchan.cpp
3  * This file is part of the YATE Project http://YATE.null.ro
4  *
5  * RTP channel - also acts as data helper for other protocols
6  *
7  * Yet Another Telephony Engine - a fully featured software PBX and IVR
8  * Copyright (C) 2004-2014 Null Team
9  *
10  * This software is distributed under multiple licenses;
11  * see the COPYING file in the main directory for licensing
12  * information for this specific distribution.
13  *
14  * This use of this software may be subject to additional restrictions.
15  * See the LEGAL file in the main directory for details.
16  *
17  * This program is distributed in the hope that it will be useful,
18  * but WITHOUT ANY WARRANTY; without even the implied warranty of
19  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
20  */
21 
22 
23 #include <yatephone.h>
24 #include <yatertp.h>
25 
26 #include <string.h>
27 #include <stdlib.h>
28 
29 #define MIN_PORT 16384
30 #define MAX_PORT 32768
31 #define BUF_SIZE 240
32 #define BUF_PREF 160
33 
34 using namespace TelEngine;
35 namespace { // anonymous
36 
37 /* Payloads for the AV profile */
38 static TokenDict dict_payloads[] = {
39     { "mulaw",         0 },
40     { "alaw",          8 },
41     { "gsm",           3 },
42     { "lpc10",         7 },
43     { "2*slin",       10 },
44     { "slin",         11 },
45     { "g726",          2 },
46     { "g722/16000",    9 },
47     { "g723",          4 },
48     { "g728",         15 },
49     { "g729",         18 },
50     { "mpa",          14 },
51     { "ilbc",         98 },
52     { "ilbc20",       98 },
53     { "ilbc30",       98 },
54     { "amr",          96 },
55     { "amr-o",        96 },
56     { "amr/16000",    99 },
57     { "amr-o/16000",  99 },
58     { "speex",       102 },
59     { "speex/16000", 103 },
60     { "speex/32000", 104 },
61     { "isac/16000",  105 },
62     { "isac/32000",  106 },
63     { "gsm-efr",     107 },
64     { "mjpeg",        26 },
65     { "h261",         31 },
66     { "h263",         34 },
67     { "h263-1998",   111 },
68     { "h263-2000",   112 },
69     { "h264",        114 },
70     { "vp8",         113 },
71     { "vp9",         115 },
72     { "mpv",          32 },
73     { "mp2t",         33 },
74     { "mp4v",        110 },
75     { "2*mulaw",     117 },
76     { "2*alaw",      118 },
77     { 0 ,              0 },
78 };
79 
80 static TokenDict dict_yrtp_dir[] = {
81     { "receive", RTPSession::RecvOnly },
82     { "send", RTPSession::SendOnly },
83     { "bidir", RTPSession::SendRecv },
84     { 0 , 0 },
85 };
86 
87 static bool s_ipv6 = false;              // IPv6 support enabled
88 static int s_minport = MIN_PORT;
89 static int s_maxport = MAX_PORT;
90 static int s_bufsize = BUF_SIZE;
91 static int s_padding = 0;
92 static String s_localip;
93 static String s_notifyMsg;
94 static bool s_autoaddr  = true;
95 static bool s_anyssrc   = false;
96 static bool s_warnFirst = false;
97 static bool s_warnLater = false;
98 static bool s_monitor   = false;
99 static bool s_rtcp  = true;
100 static bool s_drill = false;
101 
102 static Thread::Priority s_priority = Thread::Normal;
103 static String s_affinity;
104 static int s_tos     = Socket::Normal;
105 static int s_udpbuf  = 0;
106 static int s_sleep   = 5;
107 static int s_interval= 0;
108 static int s_timeout = 0;
109 static int s_udptlTimeout = 0;
110 
111 static int s_minJitter = 0;
112 static int s_maxJitter = 0;
113 
114 class YRTPSource;
115 class YRTPConsumer;
116 class YRTPSession;
117 class YUDPTLSession;
118 class YRTPReflector;
119 
120 class YRTPWrapper : public RefObject
121 {
122     friend class YRTPSource;
123     friend class YRTPConsumer;
124     friend class YRTPSession;
125     friend class YUDPTLSession;
126 public:
127     YRTPWrapper(const char *localip, CallEndpoint* conn, const char* media,
128 	RTPSession::Direction direction, Message& msg, bool udptl = false, bool ipv6 = false);
129     ~YRTPWrapper();
130     virtual void* getObject(const String& name) const;
traceId() const131     virtual const String& traceId() const
132 	{ return m_traceId; }
133     bool setParams(const char* raddr, Message& msg);
134     bool setRemote(const char* raddr, unsigned int rport, const Message& msg);
135     void setFaxDivert(const Message& msg);
136     bool sendDTMF(char dtmf, int duration = 0);
137     void gotDTMF(char tone);
138     void gotFax();
139     void timeout(bool initial);
rtp() const140     inline YRTPSession* rtp() const
141 	{ return m_rtp; }
udptl() const142     inline YUDPTLSession* udptl() const
143 	{ return m_udptl; }
session() const144     inline UDPSession* session() const
145 	{ return m_rtp ? (UDPSession*)m_rtp : (UDPSession*)m_udptl; }
dir() const146     inline RTPSession::Direction dir() const
147 	{ return m_dir; }
conn() const148     inline CallEndpoint* conn() const
149 	{ return m_conn; }
id() const150     inline const String& id() const
151 	{ return m_id; }
callId() const152     inline const String& callId() const
153 	{ return m_master; }
media() const154     inline const String& media() const
155 	{ return m_media; }
host() const156     inline const String& host() const
157 	{ return m_host; }
bufSize() const158     inline unsigned int bufSize() const
159 	{ return m_bufsize; }
port() const160     inline unsigned int port() const
161 	{ return m_port; }
setMaster(const char * master)162     inline void setMaster(const char* master)
163 	{ if (master) m_master = master; }
isAudio() const164     inline bool isAudio() const
165 	{ return m_audio; }
valid() const166     inline bool valid() const
167 	{ return m_valid; }
168     DataSource* getSource();
169     DataConsumer* getConsumer();
170     void addDirection(RTPSession::Direction direction);
171     void terminate(Message& msg);
172     static YRTPWrapper* find(const CallEndpoint* conn, const String& media);
173     static YRTPWrapper* find(const String& id);
174     static void guessLocal(const char* remoteip, String& localip, bool ipv6,
175 			const String& traceId = String::empty());
176 private:
177     void setupRTP(const char* localip, bool rtcp, bool warnSeq);
178     void setupUDPTL(const char* localip, u_int16_t maxLen = 250, u_int8_t maxSec = 2);
179     bool bindLocal(const char* localip, bool rtcp);
180     bool startRTP(const char* raddr, unsigned int rport, Message& msg);
181     bool startUDPTL(const char* raddr, unsigned int rport, Message& msg);
182     bool setupSRTP(Message& msg, bool buildMaster);
183     bool startSRTP(const String& suite, const String& keyParams, const ObjList* paramList);
184     bool setupUDPTL(Message& msg);
185     void setTimeout(const Message& msg, int timeOut);
186     static void setFormat(DataFormat& dest, const char* format, const NamedList& params);
187     YRTPSession* m_rtp;
188     YUDPTLSession* m_udptl;
189     RTPSession::Direction m_dir;
190     CallEndpoint* m_conn;
191     YRTPSource* m_source;
192     YRTPConsumer* m_consumer;
193     String m_id;
194     String m_media;
195     String m_format;
196     String m_master;
197     String m_faxDivert;
198     String m_faxCaller;
199     String m_faxCalled;
200     String m_host;
201     unsigned int m_bufsize;
202     unsigned int m_port;
203     bool m_audio;
204     bool m_valid;
205     bool m_ipv6;
206 
207     unsigned int m_noAudio;
208     unsigned int m_lostAudio;
209     String m_traceId;
210 };
211 
212 class YRTPSession : public RTPSession
213 {
214 public:
215     YRTPSession(YRTPWrapper* wrap);
216     virtual ~YRTPSession();
217     virtual bool rtpRecvData(bool marker, unsigned int timestamp,
218 	const void* data, int len);
219     virtual bool rtpRecvEvent(int event, char key, int duration,
220 	int volume, unsigned int timestamp);
221     virtual void rtpNewPayload(int payload, unsigned int timestamp);
222     virtual void rtpNewSSRC(u_int32_t newSsrc, bool marker);
223     virtual Cipher* createCipher(const String& name, Cipher::Direction dir);
224     virtual bool checkCipher(const String& name);
resync()225     inline void resync()
226 	{ m_resync = true; }
anySSRC(bool acceptAny=true)227     inline void anySSRC(bool acceptAny = true)
228 	{ m_anyssrc = acceptAny; }
229 protected:
230     virtual void timeout(bool initial);
231 private:
232     YRTPWrapper* m_wrap;
233     u_int32_t m_lastLost;
234     int m_newPayload;
235     bool m_resync;
236     bool m_anyssrc;
237     bool m_getFax;
238 };
239 
240 class YUDPTLSession : public UDPTLSession
241 {
242 public:
YUDPTLSession(YRTPWrapper * wrap,u_int16_t maxLen,u_int8_t maxSec)243     inline YUDPTLSession(YRTPWrapper* wrap, u_int16_t maxLen, u_int8_t maxSec)
244 	: UDPTLSession(maxLen,maxSec),
245 	  m_wrap(wrap)
246 	{ }
247     virtual ~YUDPTLSession();
traceId() const248     virtual const String& traceId() const
249 	{ return m_wrap ? m_wrap->traceId() : String::empty(); }
250 protected:
251     virtual void udptlRecv(const void* data, int len, u_int16_t seq, bool recovered);
252     virtual void timeout(bool initial);
253 private:
254     YRTPWrapper* m_wrap;
255 };
256 
257 class YRTPSource : public DataSource
258 {
259     friend class YRTPWrapper;
260 public:
261     YRTPSource(YRTPWrapper* wrap);
262     ~YRTPSource();
valid() const263     virtual bool valid() const
264 	{ return m_wrap && m_wrap->valid(); }
busy(bool isBusy)265     inline void busy(bool isBusy)
266 	{ m_busy = isBusy; }
267 private:
268     YRTPWrapper* m_wrap;
269     volatile bool m_busy;
270 };
271 
272 class YRTPConsumer : public DataConsumer
273 {
274     friend class YRTPWrapper;
275 public:
276     YRTPConsumer(YRTPWrapper* wrap);
277     ~YRTPConsumer();
valid() const278     virtual bool valid() const
279 	{ return m_wrap && m_wrap->valid(); }
280     virtual unsigned long Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags);
setSplitable()281     inline void setSplitable()
282 	{ m_splitable = (m_format == YSTRING("alaw")) || (m_format == YSTRING("mulaw")); }
283 private:
284     YRTPWrapper* m_wrap;
285     bool m_splitable;
286 };
287 
288 class YRTPMonitor : public RTPProcessor
289 {
290 public:
YRTPMonitor(const String * id=0)291     inline YRTPMonitor(const String* id = 0)
292 	: m_id(id),
293 	  m_rtpPackets(0), m_rtcpPackets(0), m_rtpBytes(0),
294 	  m_payload(-1), m_start(0), m_last(0)
295 	{ }
296     virtual void rtpData(const void* data, int len);
297     virtual void rtcpData(const void* data, int len);
298     void startup();
299     void saveStats(Message& msg) const;
300 protected:
301     void updateTimes(u_int64_t when);
302     void timerTick(const Time& when);
303     void timeout(bool initial);
304     const String* m_id;
305     unsigned int m_rtpPackets;
306     unsigned int m_rtcpPackets;
307     unsigned int m_rtpBytes;
308     int m_payload;
309     u_int64_t m_start;
310     u_int64_t m_last;
311 };
312 
313 class YRTPReflector : public GenObject
314 {
315 public:
316     YRTPReflector(const String& id, bool passiveA, bool passiveB);
317     virtual ~YRTPReflector();
idA() const318     inline const String& idA() const
319 	{ return m_idA; }
idB() const320     inline const String& idB() const
321 	{ return m_idB; }
rtpA() const322     inline RTPTransport& rtpA() const
323 	{ return *m_rtpA; }
rtpB() const324     inline RTPTransport& rtpB() const
325 	{ return *m_rtpB; }
monA() const326     inline YRTPMonitor& monA() const
327 	{ return *m_monA; }
monB() const328     inline YRTPMonitor& monB() const
329 	{ return *m_monB; }
setA(const String & id)330     inline void setA(const String& id)
331 	{ m_idA = id; }
setB(const String & id)332     inline void setB(const String& id)
333 	{ m_idB = id; }
334 private:
335     RTPGroup* m_group;
336     RTPTransport* m_rtpA;
337     RTPTransport* m_rtpB;
338     YRTPMonitor* m_monA;
339     YRTPMonitor* m_monB;
340     String m_idA;
341     String m_idB;
342 };
343 
344 class CipherHolder : public RefObject
345 {
346 public:
CipherHolder()347     inline CipherHolder()
348 	: m_cipher(0)
349 	{ }
~CipherHolder()350     virtual ~CipherHolder()
351 	{ TelEngine::destruct(m_cipher); }
getObject(const String & name) const352     virtual void* getObject(const String& name) const
353 	{ return (name == YATOM("Cipher*")) ? (void*)&m_cipher : RefObject::getObject(name); }
cipher()354     inline Cipher* cipher()
355 	{ Cipher* tmp = m_cipher; m_cipher = 0; return tmp; }
356 private:
357     Cipher* m_cipher;
358 };
359 
360 class YRTPPlugin : public Module
361 {
362 public:
363     YRTPPlugin();
364     virtual ~YRTPPlugin();
365     virtual void initialize();
366     virtual bool received(Message& msg, int id);
367     virtual void statusParams(String& str);
368     virtual void statusDetail(String& str);
369     virtual void genUpdate(Message& msg);
370 
371 private:
372     bool reflectSetup(Message& msg, const char* id, RTPTransport& rtp, const char* rHost, const char* leg);
373     bool reflectStart(Message& msg, const char* id, RTPTransport& rtp, SocketAddr& rAddr);
374     void reflectDrop(YRTPReflector*& refl, Lock& mylock);
375     void reflectExecute(Message& msg);
376     void reflectAnswer(Message& msg, bool ignore);
377     void reflectHangup(Message& msg);
378     bool m_first;
379 };
380 
381 static YRTPPlugin splugin;
382 
383 class AttachHandler : public MessageHandler
384 {
385 public:
AttachHandler()386     AttachHandler() : MessageHandler("chan.attach",100,splugin.name()) { }
387     virtual bool received(Message &msg);
388 };
389 
390 class RtpHandler : public MessageHandler
391 {
392 public:
RtpHandler()393     RtpHandler() : MessageHandler("chan.rtp",100,splugin.name()) { }
394     virtual bool received(Message &msg);
395 };
396 
397 class DTMFHandler : public MessageHandler
398 {
399 public:
DTMFHandler()400     DTMFHandler() : MessageHandler("chan.dtmf",150,splugin.name()) { }
401     virtual bool received(Message &msg);
402 };
403 
404 static ObjList s_calls;
405 static ObjList s_mirrors;
406 static Mutex s_mutex(false,"YRTPChan");
407 static Mutex s_refMutex(false,"YRTPChan::reflect");
408 static Mutex s_srcMutex(false,"YRTPChan::source");
409 static bool s_rtpWarnSeq = true;         // Warn on invalid rtp sequence number
410 
411 
YRTPWrapper(const char * localip,CallEndpoint * conn,const char * media,RTPSession::Direction direction,Message & msg,bool udptl,bool ipv6)412 YRTPWrapper::YRTPWrapper(const char* localip, CallEndpoint* conn, const char* media,
413     RTPSession::Direction direction, Message& msg, bool udptl, bool ipv6)
414     : m_rtp(0), m_udptl(0), m_dir(direction), m_conn(conn),
415       m_source(0), m_consumer(0), m_media(media),
416       m_bufsize(0), m_port(0), m_valid(true), m_ipv6(ipv6), m_noAudio(0), m_lostAudio(0),
417       m_traceId(msg.getValue(YSTRING("trace_id")))
418 {
419     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::YRTPWrapper('%s',%p,'%s',%s,%p,%s) [%p]",
420 	localip,conn,media,lookup(direction,dict_yrtp_dir),
421 	&msg,String::boolText(udptl),this);
422     m_id = udptl ? "udptl/" : "yrtp/";
423     m_id << (unsigned int)Random::random();
424     if (conn)
425 	m_master = conn->id();
426     m_audio = !udptl && (m_media == YSTRING("audio"));
427     s_mutex.lock();
428     s_calls.append(this);
429     if (udptl) {
430 	int md = 0xffff & msg.getIntValue(YSTRING("sdp_T38FaxMaxDatagram"));
431 	md = msg.getIntValue(YSTRING("t38maxdatagram"),md);
432 	if (md < 96)
433 	    md = 250;
434 	const String* ec = msg.getParam(YSTRING("sdp_T38FaxUdpEC"));
435 	int ms = (ec && (*ec == YSTRING("t38UDPRedundancy"))) ? 2 : 0;
436 	ms = msg.getIntValue(YSTRING("t38redundancy"),ms);
437 	if (ms < 0)
438 	    ms = 0;
439 	else if (ms > 16)
440 	    ms = 16;
441 	m_format = "t38";
442 	setupUDPTL(localip,md,ms);
443     }
444     else
445 	setupRTP(localip,msg.getBoolValue(YSTRING("rtcp"),s_rtcp),
446 	    msg.getBoolValue(YSTRING("rtp_warn_seq"),s_rtpWarnSeq));
447     splugin.changed();
448     s_mutex.unlock();
449 }
450 
~YRTPWrapper()451 YRTPWrapper::~YRTPWrapper()
452 {
453     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::~YRTPWrapper() %s '%s' [%p]",
454 	lookup(m_dir,dict_yrtp_dir),m_media.c_str(),this);
455     s_mutex.lock();
456     s_calls.remove(this,false);
457     if (m_rtp) {
458 	TraceDebug(m_traceId,DebugAll,"Cleaning up RTP %p [%p]",m_rtp,this);
459 	if (s_monitor) {
460 	    Message* m = new Message("module.update");
461 	    m->addParam("module",splugin.name());
462 	    m_rtp->getStats(*m);
463 	    m->setParam("noaudio",String(m_noAudio));
464 	    m->setParam("lostaudio",String(m_lostAudio));
465 	    Engine::enqueue(m);
466 	}
467 	TelEngine::destruct(m_rtp);
468     }
469     if (m_udptl) {
470 	TraceDebug(m_traceId,DebugAll,"Cleaning up UDPTL %p [%p]",m_udptl,this);
471 	TelEngine::destruct(m_udptl);
472     }
473     if (m_source) {
474 	TraceDebug(m_traceId,&splugin,DebugCrit,"There is still a RTP source %p [%p]",m_source,this);
475 	TelEngine::destruct(m_source);
476     }
477     if (m_consumer) {
478 	TraceDebug(m_traceId,&splugin,DebugCrit,"There is still a RTP consumer %p [%p]",m_consumer,this);
479 	TelEngine::destruct(m_consumer);
480     }
481     splugin.changed();
482     s_mutex.unlock();
483 }
484 
getObject(const String & name) const485 void* YRTPWrapper::getObject(const String& name) const
486 {
487     if (name == YATOM("Socket"))
488 	return m_rtp ? m_rtp->rtpSock() : 0;
489     if (name == YATOM("DataSource"))
490 	return m_source;
491     if (name == YATOM("DataConsumer"))
492 	return m_consumer;
493     if (name == YATOM("RTPSession"))
494 	return m_rtp;
495     return RefObject::getObject(name);
496 }
497 
find(const CallEndpoint * conn,const String & media)498 YRTPWrapper* YRTPWrapper::find(const CallEndpoint* conn, const String& media)
499 {
500     if (!conn)
501 	return 0;
502     Lock lock(s_mutex);
503     ObjList* l = &s_calls;
504     for (; l; l=l->next()) {
505 	YRTPWrapper *p = static_cast<YRTPWrapper *>(l->get());
506 	if (p && (p->conn() == conn) && (p->media() == media))
507 	    return p->ref() ? p : 0;
508     }
509     return 0;
510 }
511 
find(const String & id)512 YRTPWrapper* YRTPWrapper::find(const String& id)
513 {
514     if (id.null())
515 	return 0;
516     Lock lock(s_mutex);
517     ObjList* l = &s_calls;
518     for (; l; l=l->next()) {
519 	YRTPWrapper *p = static_cast<YRTPWrapper *>(l->get());
520 	if (p && (p->id() == id))
521 	    return p->ref() ? p : 0;
522     }
523     return 0;
524 }
525 
setupRTP(const char * localip,bool rtcp,bool warnSeq)526 void YRTPWrapper::setupRTP(const char* localip, bool rtcp, bool warnSeq)
527 {
528     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::setupRTP(\"%s\",%s,%s) [%p]",
529 	localip,String::boolText(rtcp),String::boolText(warnSeq),this);
530     m_rtp = new YRTPSession(this);
531     m_rtp->setWarnSeq(warnSeq);
532     m_rtp->initTransport();
533     bindLocal(localip,rtcp);
534 }
535 
setupUDPTL(const char * localip,u_int16_t maxLen,u_int8_t maxSec)536 void YRTPWrapper::setupUDPTL(const char* localip, u_int16_t maxLen, u_int8_t maxSec)
537 {
538     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::setupUDPTL(\"%s\",%u,%u) [%p]",
539 	localip,maxLen,maxSec,this);
540     m_udptl = new YUDPTLSession(this,maxLen,maxSec);
541     m_udptl->initTransport();
542     bindLocal(localip,false);
543 }
544 
setupUDPTL(Message & msg)545 bool YRTPWrapper::setupUDPTL(Message& msg)
546 {
547     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::setupUDPTL(%p '%s') [%p]",
548 	&msg,msg.c_str(),this);
549     if (!m_udptl)
550 	return false;
551     String tmp(m_udptl->maxLen());
552     msg.setParam("t38maxdatagram",tmp);
553     msg.setParam("osdp_T38FaxMaxDatagram",tmp);
554     msg.setParam("t38redundancy",String(m_udptl->maxSec()));
555     if (m_udptl->maxSec())
556 	msg.setParam("osdp_T38FaxUdpEC","t38UDPRedundancy");
557     return true;
558 }
559 
bindLocal(const char * localip,bool rtcp)560 bool YRTPWrapper::bindLocal(const char* localip, bool rtcp)
561 {
562     int minport = s_minport;
563     int maxport = s_maxport;
564     int attempt = 10;
565     if (minport > maxport) {
566 	int tmp = maxport;
567 	maxport = minport;
568 	minport = tmp;
569     }
570     else if (minport == maxport) {
571 	maxport++;
572 	attempt = 1;
573     }
574     SocketAddr addr(m_ipv6 ? SocketAddr::Unknown : SocketAddr::IPv4);
575     if (!addr.host(localip)) {
576 	TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper '%s' could not parse address '%s' [%p]",
577 	    m_id.c_str(),localip,this);
578 	return false;
579     }
580     for (; attempt; attempt--) {
581 	int lport = (minport + (Random::random() % (maxport - minport))) & 0xfffe;
582 	addr.port(lport);
583 	if (m_rtp ? m_rtp->localAddr(addr,rtcp) : m_udptl->localAddr(addr)) {
584 	    m_host = addr.host();
585 	    m_port = lport;
586 	    TraceDebug(m_traceId,&splugin,DebugInfo,"Session '%s' %p bound to %s%s [%p]",
587 		m_id.c_str(),session(),addr.addr().c_str(),(rtcp ? " +RTCP" : ""),this);
588 	    return true;
589 	}
590     }
591     TraceDebug(m_traceId,&splugin,DebugWarn,"YRTPWrapper '%s' bind failed in range %d-%d on '%s' [%p]",
592 	m_id.c_str(),minport,maxport,localip,this);
593     return false;
594 }
595 
setParams(const char * rip,Message & msg)596 bool YRTPWrapper::setParams(const char* rip, Message& msg)
597 {
598     // start or just setup either RTP or UDPTL
599     int rport = msg.getIntValue(YSTRING("remoteport"));
600     if (rip && (rport > 0))
601 	return m_udptl ? startUDPTL(rip,rport,msg) : startRTP(rip,rport,msg);
602     else
603 	return m_udptl ? setupUDPTL(msg) : setupSRTP(msg,msg.getBoolValue(YSTRING("secure")));
604 }
605 
setRemote(const char * raddr,unsigned int rport,const Message & msg)606 bool YRTPWrapper::setRemote(const char* raddr, unsigned int rport, const Message& msg)
607 {
608     if (!session())
609 	return false;
610     SocketAddr addr(m_ipv6 ? SocketAddr::Unknown : SocketAddr::IPv4);
611     if (!(addr.host(raddr) && addr.port(rport) && session()->remoteAddr(addr,msg.getBoolValue(YSTRING("autoaddr"),s_autoaddr)))) {
612 	TraceDebug(m_traceId,&splugin,DebugWarn,"RTP failed to set remote address %s [%p]",
613 	    SocketAddr::appendTo(raddr,rport).c_str(),this);
614 	return false;
615     }
616     return true;
617 }
618 
setTimeout(const Message & msg,int timeOut)619 void YRTPWrapper::setTimeout(const Message& msg, int timeOut)
620 {
621     const String* param = msg.getParam(YSTRING("timeout"));
622     if (param) {
623 	// accept true/false to apply default or disable
624 	if (param->isBoolean()) {
625 	    if (param->toBoolean())
626 		timeOut = rtp() ? s_timeout : s_udptlTimeout;
627 	    else
628 		timeOut = 0;
629 	}
630 	else
631 	    timeOut = param->toInteger(timeOut);
632     }
633     if ((timeOut >= 0) && session())
634 	session()->setTimeout(timeOut);
635 }
636 
setFormat(DataFormat & dest,const char * format,const NamedList & params)637 void YRTPWrapper::setFormat(DataFormat& dest, const char* format, const NamedList& params)
638 {
639     dest.clearParams();
640     dest = format;
641     const String& fmtp = params["sdp_fmtp:" + dest];
642     if (fmtp.null())
643 	return;
644     ObjList* lst = fmtp.split(';');
645     for (ObjList* l = lst->skipNull(); l; l = l->skipNext()) {
646 	const String* s = static_cast<const String*>(l->get());
647 	int pos = s->find('=');
648 	String name;
649 	String value;
650 	if (pos > 0) {
651 	    name = s->substr(0,pos);
652 	    value = s->substr(pos + 1);
653 	}
654 	else if (pos)
655 	    name = *s;
656 	if (name.trimSpaces())
657 	    dest.addParam(name,value.trimSpaces());
658     }
659     TelEngine::destruct(lst);
660 }
661 
startRTP(const char * raddr,unsigned int rport,Message & msg)662 bool YRTPWrapper::startRTP(const char* raddr, unsigned int rport, Message& msg)
663 {
664     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::startRTP(\"%s\",%u) [%p]",raddr,rport,this);
665     if (!m_rtp) {
666 	TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper attempted to start RTP before setup! [%p]",this);
667 	return false;
668     }
669 
670     if (m_bufsize) {
671 	DDebug(&splugin,DebugAll,"Wrapper attempted to restart RTP! [%p]",this);
672 	setRemote(raddr,rport,msg);
673 	m_rtp->resync();
674 	setTimeout(msg,-1);
675 	m_rtp->initDebugData(msg);
676 	return true;
677     }
678 
679     String p(msg.getValue(YSTRING("payload")));
680     if (p.null())
681 	p = msg.getValue(YSTRING("format"));
682     int payload = p.toInteger(dict_payloads,-1);
683     int evpayload = msg.getIntValue(YSTRING("evpayload"),101);
684     const char* format = msg.getValue(YSTRING("format"));
685     int tos = msg.getIntValue(YSTRING("tos"),Socket::tosValues(),s_tos);
686     int buflen = msg.getIntValue(YSTRING("buffer"),s_udpbuf);
687     int msec = msg.getIntValue(YSTRING("msleep"),s_sleep);
688 
689     if (!format)
690 	format = lookup(payload, dict_payloads);
691     if (!format) {
692 	if (payload < 0)
693 	    TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper neither format nor payload specified [%p]",this);
694 	else
695 	    TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper can't find name for payload %d [%p]",payload,this);
696 	return false;
697     }
698 
699     if (payload == -1)
700 	payload = lookup(format, dict_payloads, -1);
701     if (payload == -1) {
702 	TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper can't find payload for format %s [%p]",format,this);
703 	return false;
704     }
705 
706     if ((payload < 0) || (payload >= 127)) {
707 	TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper received invalid payload %d [%p]",payload,this);
708 	return false;
709     }
710 
711     TraceDebug(m_traceId,&splugin,DebugInfo,"RTP starting format '%s' payload %d [%p]",format,payload,this);
712 
713     if (!setRemote(raddr,rport,msg))
714 	return false;
715     m_rtp->anySSRC(msg.getBoolValue(YSTRING("anyssrc"),s_anyssrc));
716     m_format = format;
717     // Change format of source and/or consumer,
718     //  reinstall them to rebuild codec chains
719     if (m_source) {
720 	if (m_conn) {
721 	    m_source->ref();
722 	    m_conn->setSource(0,m_media);
723 	}
724 	setFormat(m_source->m_format,format,msg);
725 	if (m_conn) {
726 	    m_conn->setSource(m_source,m_media);
727 	    m_source->deref();
728 	}
729     }
730     if (m_consumer) {
731 	if (m_conn) {
732 	    m_consumer->ref();
733 	    m_conn->setConsumer(0,m_media);
734 	}
735 	setFormat(m_consumer->m_format,format,msg);
736 	m_consumer->setSplitable();
737 	if (m_conn) {
738 	    m_conn->setConsumer(m_consumer,m_media);
739 	    m_consumer->deref();
740 	}
741     }
742     if (!(m_rtp->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority),msg.getValue(YSTRING("affinity"),s_affinity)) &&
743 	 m_rtp->direction(m_dir)))
744 	return false;
745 
746     m_rtp->initDebugData(msg);
747     bool secure = false;
748     const String* sec = msg.getParam(YSTRING("crypto_suite"));
749     if (sec && *sec) {
750 	// separate crypto parameters
751 	const String* key = msg.getParam(YSTRING("crypto_key"));
752 	if (key && *key) {
753 	    if (startSRTP(*sec,*key,0))
754 		secure = true;
755 	    else
756 		TraceDebug(m_traceId,&splugin,DebugWarn,"Could not start SRTP for: '%s' '%s' [%p]",
757 		    sec->c_str(),key->c_str(),this);
758 	}
759 	sec = 0;
760 	msg.clearParam("crypto_suite");
761     }
762     secure = secure && setupSRTP(msg,true);
763     if (!secure)
764 	m_rtp->security(0);
765 
766     m_rtp->dataPayload(payload);
767     m_rtp->eventPayload(evpayload);
768     m_rtp->setTOS(tos);
769     if (buflen > 0)
770 	m_rtp->setBuffer(buflen);
771     m_rtp->padding(msg.getIntValue(YSTRING("padding"),s_padding));
772     if (msg.getBoolValue(YSTRING("drillhole"),s_drill)) {
773 	bool ok = m_rtp->drillHole();
774 	TraceDebug(m_traceId,&splugin,(ok ? DebugInfo : DebugWarn),
775 	    "Wrapper %s a hole in firewall/NAT [%p]",
776 	    (ok ? "opened" : "failed to open"),this);
777     }
778     setTimeout(msg,s_timeout);
779     m_rtp->setReports(msg.getIntValue(YSTRING("rtcp_interval"),s_interval));
780     // dejittering is only meaningful for audio
781     if (isAudio()){
782 	int minJitter = msg.getIntValue(YSTRING("minjitter"),s_minJitter);
783 	int maxJitter = msg.getIntValue(YSTRING("maxjitter"),s_maxJitter);
784 	if (minJitter >= 0 && maxJitter > 0)
785 	    m_rtp->setDejitter(minJitter*1000,maxJitter*1000);
786     }
787     m_bufsize = s_bufsize;
788     return true;
789 }
790 
startUDPTL(const char * raddr,unsigned int rport,Message & msg)791 bool YRTPWrapper::startUDPTL(const char* raddr, unsigned int rport, Message& msg)
792 {
793     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::startUDPTL(\"%s\",%u) [%p]",raddr,rport,this);
794     if (!m_udptl) {
795 	TraceDebug(m_traceId,&splugin,DebugWarn,"Wrapper attempted to start UDPTL before setup! [%p]",this);
796 	return false;
797     }
798 
799     int tos = msg.getIntValue(YSTRING("tos"),Socket::tosValues(),s_tos);
800     int msec = msg.getIntValue(YSTRING("msleep"),s_sleep);
801     if (!setRemote(raddr,rport,msg))
802 	return false;
803     if (!m_udptl->initGroup(msec,Thread::priority(msg.getValue(YSTRING("thread")),s_priority),
804 				msg.getValue(YSTRING("affinity"),s_affinity)))
805 	return false;
806 
807     m_udptl->setTOS(tos);
808     if (msg.getBoolValue(YSTRING("drillhole"),s_drill)) {
809 	bool ok = m_udptl->drillHole();
810 	TraceDebug(m_traceId,&splugin,(ok ? DebugInfo : DebugWarn),
811 	    "Wrapper %s a hole in firewall/NAT [%p]",
812 	    (ok ? "opened" : "failed to open"),this);
813     }
814     setTimeout(msg,s_udptlTimeout);
815     return setupUDPTL(msg);
816 }
817 
setupSRTP(Message & msg,bool buildMaster)818 bool YRTPWrapper::setupSRTP(Message& msg, bool buildMaster)
819 {
820     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::setupSRTP(%s) [%p]",
821 	String::boolText(buildMaster),this);
822     if (!m_rtp)
823 	return false;
824 
825     RTPSecure* srtp = m_rtp->security();
826     if (!srtp) {
827 	if (!buildMaster)
828 	    return false;
829 	if (m_rtp->receiver())
830 	    srtp = m_rtp->receiver()->security();
831 	if (srtp)
832 	    srtp = new RTPSecure(*srtp);
833 	else
834 	    srtp = new RTPSecure(msg[YSTRING("crypto_suite")],&splugin,m_traceId);
835     }
836     else
837 	buildMaster = false;
838 
839     String suite;
840     String key;
841     if (!(srtp->supported(m_rtp) && srtp->create(suite,key,true))) {
842 	if (buildMaster)
843 	    TelEngine::destruct(srtp);
844 	return false;
845     }
846     m_rtp->security(srtp);
847 
848     msg.setParam("ocrypto_suite",suite);
849     msg.setParam("ocrypto_key",key);
850     return true;
851 }
852 
startSRTP(const String & suite,const String & keyParams,const ObjList * paramList)853 bool YRTPWrapper::startSRTP(const String& suite, const String& keyParams, const ObjList* paramList)
854 {
855     TraceDebug(m_traceId,&splugin,DebugAll,"YRTPWrapper::startSRTP('%s','%s',%p) [%p]",
856 	suite.c_str(),keyParams.c_str(),paramList,this);
857     if (!(m_rtp && m_rtp->receiver()))
858 	return false;
859     RTPSecure* srtp = new RTPSecure(&splugin,m_traceId);
860     if (srtp->supported(m_rtp) && srtp->setup(suite,keyParams,paramList)) {
861 	m_rtp->receiver()->security(srtp);
862 	TraceDebug(m_traceId,&splugin,DebugCall,"Started SRTP suite '%s' [%p]",suite.c_str(),this);
863 	return true;
864     }
865     TelEngine::destruct(srtp);
866     return false;
867 }
868 
sendDTMF(char dtmf,int duration)869 bool YRTPWrapper::sendDTMF(char dtmf, int duration)
870 {
871     return m_rtp && m_rtp->rtpSendKey(dtmf,duration);
872 }
873 
gotDTMF(char tone)874 void YRTPWrapper::gotDTMF(char tone)
875 {
876     TraceDebug(m_traceId,&splugin,DebugInfo,"YRTPWrapper::gotDTMF('%c') [%p]",tone,this);
877     if (m_master.null())
878 	return;
879     char buf[2];
880     buf[0] = tone;
881     buf[1] = 0;
882     Message *m = new Message("chan.masquerade");
883     m->addParam("id",m_master);
884     m->addParam("message","chan.dtmf");
885     m->addParam("text",buf);
886     m->addParam("detected","rfc2833");
887     Engine::enqueue(m);
888 }
889 
gotFax()890 void YRTPWrapper::gotFax()
891 {
892     TraceDebug(m_traceId,&splugin,DebugInfo,"YRTPWrapper::gotFax() [%p]",this);
893     if (m_master.null())
894 	return;
895     Message* m = new Message("chan.masquerade");
896     m->addParam("id",m_master);
897     if (m_faxDivert) {
898 	TraceDebug(m_traceId,&splugin,DebugCall,"Diverting call %s to: %s",
899 	    m_master.c_str(),m_faxDivert.c_str());
900 	m->addParam("message","call.execute");
901 	m->addParam("callto",m_faxDivert);
902 	m->addParam("reason","fax");
903     }
904     else {
905 	m->addParam("message","call.fax");
906 	m->addParam("detected","rfc2833");
907     }
908     m->addParam("caller",m_faxCaller,false);
909     m->addParam("called",m_faxCalled,false);
910     Engine::enqueue(m);
911 }
912 
timeout(bool initial)913 void YRTPWrapper::timeout(bool initial)
914 {
915     if (!(initial ? s_warnFirst : s_warnLater))
916 	return;
917     if (initial)
918 	m_noAudio++;
919     else
920 	m_lostAudio++;
921     TraceDebug(m_traceId,&splugin,DebugWarn,"%s timeout in%s%s wrapper [%p]",
922 	(initial ? "Initial" : "Later"),
923 	(m_master ? " channel " : ""),
924 	m_master.safe(),this);
925     if (m_master && s_notifyMsg) {
926 	Message* m = new Message(s_notifyMsg);
927 	m->addParam("id",m_master);
928 	m->addParam("reason","nomedia");
929 	m->addParam("event","timeout");
930 	m->addParam("initial",String::boolText(initial));
931 	Engine::enqueue(m);
932     }
933 }
934 
guessLocal(const char * remoteip,String & localip,bool ipv6,const String & traceId)935 void YRTPWrapper::guessLocal(const char* remoteip, String& localip, bool ipv6, const String& traceId)
936 {
937     if (s_localip) {
938 	localip = s_localip;
939 	return;
940     }
941     localip.clear();
942     SocketAddr r(ipv6 ? SocketAddr::Unknown : SocketAddr::IPv4);
943     if (!r.host(remoteip)) {
944 	TraceDebug(traceId,&splugin,DebugNote,"Guess - Could not parse remote '%s'",remoteip);
945 	return;
946     }
947     SocketAddr l;
948     if (!l.local(r)) {
949 	TraceDebug(traceId,&splugin,DebugNote,"Guess - Could not guess local for remote '%s'",remoteip);
950 	return;
951     }
952     localip = l.host();
953     TraceDebug(traceId,&splugin,DebugInfo,"Guessed local IP '%s' for remote '%s'",localip.c_str(),remoteip);
954 }
955 
getSource()956 DataSource* YRTPWrapper::getSource()
957 {
958     if (m_source && m_source->ref())
959 	return m_source;
960     return new YRTPSource(this);
961 }
962 
getConsumer()963 DataConsumer* YRTPWrapper::getConsumer()
964 {
965     if (m_consumer && m_consumer->ref())
966 	return m_consumer;
967     return new YRTPConsumer(this);
968 }
969 
addDirection(RTPSession::Direction direction)970 void YRTPWrapper::addDirection(RTPSession::Direction direction)
971 {
972     m_dir = (RTPSession::Direction)(m_dir | direction);
973     if (m_rtp && m_bufsize)
974 	m_rtp->direction(m_dir);
975 }
976 
terminate(Message & msg)977 void YRTPWrapper::terminate(Message& msg)
978 {
979     TraceDebug(m_traceId,&splugin,DebugInfo,"YRTPWrapper::terminate() [%p]",this);
980     String stats;
981     if (m_rtp)
982 	m_rtp->getStats(stats);
983     if (m_udptl)
984 	m_udptl->getStats(stats);
985     if (stats)
986 	msg.setParam("stats",stats);
987     m_valid = false;
988 }
989 
setFaxDivert(const Message & msg)990 void YRTPWrapper::setFaxDivert(const Message& msg)
991 {
992     NamedString* divert = msg.getParam(YSTRING("fax_divert"));
993     if (!divert)
994 	return;
995     // if divert is empty or false disable diverting
996     if (divert->null() || !divert->toBoolean(true))
997 	m_faxDivert.clear();
998     else {
999 	m_faxDivert = *divert;
1000 	m_faxCaller = msg.getValue(YSTRING("fax_caller"),msg.getValue(YSTRING("caller"),m_faxCaller));
1001 	m_faxCalled = msg.getValue(YSTRING("fax_called"),msg.getValue(YSTRING("called"),m_faxCalled));
1002     }
1003 }
1004 
1005 
YRTPSession(YRTPWrapper * wrap)1006 YRTPSession::YRTPSession(YRTPWrapper* wrap)
1007     : RTPSession(&splugin,wrap ? wrap->traceId().c_str() : (const char*)0),
1008     m_wrap(wrap), m_lastLost(0), m_newPayload(-1),
1009     m_resync(false), m_anyssrc(false), m_getFax(true)
1010 {
1011 }
1012 
~YRTPSession()1013 YRTPSession::~YRTPSession()
1014 {
1015     // disconnect thread and transport before our virtual methods become invalid
1016     // this will also lock the group preventing rtpRecvData from being called
1017     group(0);
1018     transport(0);
1019 }
1020 
rtpRecvData(bool marker,unsigned int timestamp,const void * data,int len)1021 bool YRTPSession::rtpRecvData(bool marker, unsigned int timestamp, const void* data, int len)
1022 {
1023     s_srcMutex.lock();
1024     YRTPSource* source = m_wrap ? m_wrap->m_source : 0;
1025     // we MUST NOT reference count here as RTPGroup will crash if we remove
1026     // any RTPProcessor from its own thread
1027     if (source) {
1028 	if (source->alive())
1029 	    source->busy(true);
1030 	else
1031 	    source = 0;
1032     }
1033     s_srcMutex.unlock();
1034     if (!source)
1035 	return false;
1036     unsigned long flags = (marker ? DataNode::DataMark : 0);
1037     u_int32_t lost = ioPacketsLost();
1038     if (lost != m_lastLost) {
1039 	if (lost > m_lastLost)
1040 	    flags |= DataNode::DataMissed;
1041 	m_lastLost = lost;
1042     }
1043     // the source will not be destroyed until we reset the busy flag
1044     DataBlock block;
1045     block.assign((void*)data, len, false);
1046     source->Forward(block,timestamp,flags);
1047     block.clear(false);
1048     source->busy(false);
1049     return true;
1050 }
1051 
rtpRecvEvent(int event,char key,int duration,int volume,unsigned int timestamp)1052 bool YRTPSession::rtpRecvEvent(int event, char key, int duration,
1053 	int volume, unsigned int timestamp)
1054 {
1055     if (!m_wrap)
1056 	return false;
1057     if (key) {
1058 	m_wrap->gotDTMF(key);
1059 	return true;
1060     }
1061     if (event == 36) {
1062 	// got G3 CNG
1063 	if (m_getFax) {
1064 	    m_getFax = false;
1065 	    m_wrap->gotFax();
1066 	}
1067 	return true;
1068     }
1069     return false;
1070 }
1071 
rtpNewPayload(int payload,unsigned int timestamp)1072 void YRTPSession::rtpNewPayload(int payload, unsigned int timestamp)
1073 {
1074     if (payload == 13) {
1075 	TraceDebugObj(m_wrap,&splugin,DebugInfo,"Activating RTP silence payload %d in wrapper %p",payload,m_wrap);
1076 	silencePayload(payload);
1077     }
1078     else if (payload != m_newPayload) {
1079 	if (!receiver() || receiver()->dataPayload() != -1) {
1080 	    m_newPayload = payload;
1081 	    TraceDebugObj(m_wrap,&splugin,DebugMild,"Unexpected payload %d in wrapper %p",payload,m_wrap);
1082 	}
1083     }
1084 }
1085 
rtpNewSSRC(u_int32_t newSsrc,bool marker)1086 void YRTPSession::rtpNewSSRC(u_int32_t newSsrc, bool marker)
1087 {
1088     if ((m_anyssrc || m_resync) && receiver()) {
1089 	m_resync = false;
1090 	TraceDebugObj(m_wrap,&splugin,DebugInfo,"Changing SSRC from %08X to %08X in wrapper %p",
1091 	    receiver()->ssrc(),newSsrc,m_wrap);
1092 	receiver()->ssrc(newSsrc);
1093     }
1094 }
1095 
timeout(bool initial)1096 void YRTPSession::timeout(bool initial)
1097 {
1098     if (m_wrap)
1099 	m_wrap->timeout(initial);
1100 }
1101 
createCipher(const String & name,Cipher::Direction dir)1102 Cipher* YRTPSession::createCipher(const String& name, Cipher::Direction dir)
1103 {
1104     Message msg("engine.cipher");
1105     msg.addParam("cipher",name);
1106     msg.addParam("direction",lookup(dir,Cipher::directions(),"unknown"));
1107     CipherHolder* cHold = new CipherHolder;
1108     msg.userData(cHold);
1109     cHold->deref();
1110     return Engine::dispatch(msg) ? cHold->cipher() : 0;
1111 }
1112 
checkCipher(const String & name)1113 bool YRTPSession::checkCipher(const String& name)
1114 {
1115     Message msg("engine.cipher");
1116     msg.addParam("cipher",name);
1117     return Engine::dispatch(msg);
1118 }
1119 
1120 
~YUDPTLSession()1121 YUDPTLSession::~YUDPTLSession()
1122 {
1123     // disconnect thread and transport before our virtual methods become invalid
1124     // this will also lock the group preventing rtpRecvData from being called
1125     group(0);
1126     transport(0);
1127 }
1128 
udptlRecv(const void * data,int len,u_int16_t seq,bool recovered)1129 void YUDPTLSession::udptlRecv(const void* data, int len, u_int16_t seq, bool recovered)
1130 {
1131     s_srcMutex.lock();
1132     YRTPSource* source = m_wrap ? m_wrap->m_source : 0;
1133     // we MUST NOT reference count here as RTPGroup will crash if we remove
1134     // any RTPProcessor from its own thread
1135     if (source) {
1136 	if (source->alive())
1137 	    source->busy(true);
1138 	else
1139 	    source = 0;
1140     }
1141     s_srcMutex.unlock();
1142     if (!source)
1143 	return;
1144     // the source will not be destroyed until we reset the busy flag
1145     DataBlock block;
1146     block.assign((void*)data, len, false);
1147     // use the sequence number as timestamp, duplicates are possible
1148     source->Forward(block,seq);
1149     block.clear(false);
1150     source->busy(false);
1151 }
1152 
timeout(bool initial)1153 void YUDPTLSession::timeout(bool initial)
1154 {
1155     if (m_wrap)
1156 	m_wrap->timeout(initial);
1157 }
1158 
1159 
YRTPSource(YRTPWrapper * wrap)1160 YRTPSource::YRTPSource(YRTPWrapper* wrap)
1161     : m_wrap(wrap), m_busy(false)
1162 {
1163     TraceDebugObj(m_wrap,&splugin,DebugAll,"YRTPSource::YRTPSource(%p) [%p]",wrap,this);
1164     m_format.clear();
1165     if (m_wrap) {
1166 	m_wrap->ref();
1167 	m_format = m_wrap->m_format;
1168 	m_wrap->m_source = this;
1169     }
1170 }
1171 
~YRTPSource()1172 YRTPSource::~YRTPSource()
1173 {
1174     TraceDebugObj(m_wrap,&splugin,DebugAll,"YRTPSource::~YRTPSource() [%p] wrapper=%p ts=%lu",
1175 	this,m_wrap,m_timestamp);
1176     if (m_wrap) {
1177 	s_srcMutex.lock();
1178 	YRTPWrapper* tmp = m_wrap;
1179 	const YRTPSource* s = tmp->m_source;
1180 	m_wrap = 0;
1181 	tmp->m_source = 0;
1182 	s_srcMutex.unlock();
1183 	if (s != this)
1184 	    Debug(&splugin,DebugCrit,"Wrapper %p held source %p not [%p]",tmp,s,this);
1185 	// we have just to wait for any YRTPSession::rtpRecvData() to finish
1186 	while (m_busy)
1187 	    Thread::yield();
1188 	tmp->deref();
1189     }
1190 }
1191 
1192 
YRTPConsumer(YRTPWrapper * wrap)1193 YRTPConsumer::YRTPConsumer(YRTPWrapper *wrap)
1194     : m_wrap(wrap), m_splitable(false)
1195 {
1196     TraceDebugObj(m_wrap,&splugin,DebugAll,"YRTPConsumer::YRTPConsumer(%p) [%p]",wrap,this);
1197     m_format.clear();
1198     if (m_wrap) {
1199 	m_wrap->ref();
1200 	m_format = m_wrap->m_format;
1201 	if (m_format)
1202 	    setSplitable();
1203 	m_wrap->m_consumer = this;
1204     }
1205 }
1206 
~YRTPConsumer()1207 YRTPConsumer::~YRTPConsumer()
1208 {
1209     TraceDebugObj(m_wrap,&splugin,DebugAll,"YRTPConsumer::~YRTPConsumer() [%p] wrapper=%p ts=%lu",
1210 	this,m_wrap,m_timestamp);
1211     if (m_wrap) {
1212 	YRTPWrapper* tmp = m_wrap;
1213 	const YRTPConsumer* c = tmp->m_consumer;
1214 	m_wrap = 0;
1215 	tmp->m_consumer = 0;
1216 	tmp->deref();
1217 	if (c != this)
1218 	    TraceDebugObj(m_wrap,&splugin,DebugCrit,"Wrapper %p held consumer %p not [%p]",tmp,c,this);
1219     }
1220 }
1221 
Consume(const DataBlock & data,unsigned long tStamp,unsigned long flags)1222 unsigned long YRTPConsumer::Consume(const DataBlock &data, unsigned long tStamp, unsigned long flags)
1223 {
1224     if (!(m_wrap && m_wrap->valid()))
1225 	return 0;
1226     const char* ptr = (const char*)data.data();
1227     unsigned int len = data.length();
1228     if (len && m_wrap->udptl()) {
1229 	XDebug(&splugin,DebugAll,"YRTPConsumer writing %d UDPTL bytes, ts=%lu [%p]",
1230 	    data.length(),tStamp,this);
1231 	// for T.38 the timestamp is used as sequence number
1232 	m_wrap->udptl()->udptlSend(ptr,len,(uint16_t)tStamp);
1233 	return invalidStamp();
1234     }
1235     if (!(m_wrap->bufSize() && m_wrap->rtp()))
1236 	return 0;
1237     XDebug(&splugin,DebugAll,"YRTPConsumer writing %d bytes, ts=%lu [%p]",
1238 	data.length(),tStamp,this);
1239     unsigned int buf = m_wrap->bufSize();
1240     while (len && m_wrap && m_wrap->rtp()) {
1241 	unsigned int sz = len;
1242 	if (m_splitable && m_wrap->isAudio() && (sz > buf)) {
1243 	    // divide evenly a buffer that is multiple of preferred size
1244 	    if ((buf > BUF_PREF) && ((len % BUF_PREF) == 0))
1245 		sz = BUF_PREF;
1246 	    else
1247 		sz = buf;
1248 	    DDebug(&splugin,DebugAll,"Creating %u bytes fragment of %u bytes buffer",sz,len);
1249 	}
1250 	bool mark = (flags & DataMark) != 0;
1251 	flags &= ~DataMark;
1252 	m_wrap->rtp()->rtpSendData(mark,tStamp,ptr,sz);
1253 	// if timestamp increment is not provided we have to guess...
1254 	tStamp += sz;
1255 	len -= sz;
1256 	ptr += sz;
1257     }
1258     return invalidStamp();
1259 }
1260 
1261 
updateTimes(u_int64_t when)1262 void YRTPMonitor::updateTimes(u_int64_t when)
1263 {
1264     if (!m_start)
1265 	m_start = when;
1266     m_last = when;
1267 }
1268 
rtpData(const void * data,int len)1269 void YRTPMonitor::rtpData(const void* data, int len)
1270 {
1271     updateTimes(Time::now());
1272     m_rtpPackets++;
1273     m_rtpBytes += len;
1274     // we already know data is at least 12 bytes (RTP header) long
1275     m_payload = 0x7f & ((const unsigned char*)data)[1];
1276 }
1277 
rtcpData(const void * data,int len)1278 void YRTPMonitor::rtcpData(const void* data, int len)
1279 {
1280     updateTimes(Time::now());
1281     m_rtcpPackets++;
1282 }
1283 
timerTick(const Time & when)1284 void YRTPMonitor::timerTick(const Time& when)
1285 {
1286     if (!(m_id && m_last))
1287 	return;
1288     u_int64_t tout = 1000 * s_timeout;
1289     if (tout && ((m_last + tout) < when.usec()))
1290 	timeout(0 == m_start);
1291 }
1292 
timeout(bool initial)1293 void YRTPMonitor::timeout(bool initial)
1294 {
1295     if (null(m_id))
1296 	return;
1297     if (!(initial ? s_warnFirst : s_warnLater))
1298 	return;
1299     Debug(&splugin,DebugWarn,"%s timeout in '%s' reflector [%p]",
1300 	(initial ? "Initial" : "Later"),m_id->c_str(),this);
1301     if (s_notifyMsg) {
1302 	Message* m = new Message(s_notifyMsg);
1303 	m->addParam("id",m_id->c_str());
1304 	m->addParam("reason","nomedia");
1305 	m->addParam("event","timeout");
1306 	m->addParam("initial",String::boolText(initial));
1307 	Engine::enqueue(m);
1308     }
1309     // been there, done that, enough
1310     m_id = 0;
1311 }
1312 
startup()1313 void YRTPMonitor::startup()
1314 {
1315     if (0 == m_last)
1316 	m_last = Time::now();
1317 }
1318 
saveStats(Message & msg) const1319 void YRTPMonitor::saveStats(Message& msg) const
1320 {
1321     uint64_t d = m_start ? ((m_last - m_start + 500000) / 1000000) : 0;
1322     msg.addParam("rtp_rx_packets",String(m_rtpPackets));
1323     msg.addParam("rtcp_rx_packets",String(m_rtcpPackets));
1324     msg.addParam("rtp_rx_bytes",String(m_rtpBytes));
1325     msg.addParam("rtp_rx_duration",String(d));
1326     if (m_payload >= 0)
1327 	msg.addParam("rtp_rx_payload",String(m_payload));
1328 }
1329 
1330 
YRTPReflector(const String & id,bool passiveA,bool passiveB)1331 YRTPReflector::YRTPReflector(const String& id, bool passiveA, bool passiveB)
1332     : m_idA(id)
1333 {
1334     DDebug(&splugin,DebugInfo,"YRTPReflector::YRTPReflector('%s') [%p]",id.c_str(),this);
1335     m_group = new RTPGroup(s_sleep,s_priority);
1336     m_rtpA = new RTPTransport;
1337     m_rtpB = new RTPTransport;
1338     m_rtpA->setProcessor(m_rtpB);
1339     m_rtpB->setProcessor(m_rtpA);
1340     m_monA = new YRTPMonitor(passiveA ? 0 : &m_idA);
1341     m_rtpA->setMonitor(m_monA);
1342     m_monB = new YRTPMonitor(passiveB ? 0 : &m_idB);
1343     m_rtpB->setMonitor(m_monB);
1344     m_group->join(m_rtpA);
1345     m_group->join(m_rtpB);
1346     m_group->join(m_monA);
1347     m_group->join(m_monB);
1348 }
1349 
~YRTPReflector()1350 YRTPReflector::~YRTPReflector()
1351 {
1352     DDebug(&splugin,DebugInfo,"YRTPReflector::~YRTPReflector() [%p]",this);
1353     m_rtpA->setProcessor();
1354     m_rtpA->setMonitor();
1355     m_rtpB->setProcessor();
1356     m_rtpB->setMonitor();
1357     m_group->part(m_rtpA);
1358     m_group->part(m_monA);
1359     m_group->part(m_rtpB);
1360     m_group->part(m_monB);
1361     m_group = 0;
1362     TelEngine::destruct(m_rtpA);
1363     TelEngine::destruct(m_rtpB);
1364     TelEngine::destruct(m_monA);
1365     TelEngine::destruct(m_monB);
1366     s_mutex.lock();
1367     splugin.changed();
1368     s_mutex.unlock();
1369 }
1370 
1371 
received(Message & msg)1372 bool AttachHandler::received(Message &msg)
1373 {
1374     int more = 2;
1375     String src(msg.getValue(YSTRING("source")));
1376     if (src.null())
1377 	more--;
1378     else {
1379 	if (src.startSkip("rtp/",false))
1380 	    more--;
1381 	else
1382 	    src = "";
1383     }
1384 
1385     String cons(msg.getValue(YSTRING("consumer")));
1386     if (cons.null())
1387 	more--;
1388     else {
1389 	if (cons.startSkip("rtp/",false))
1390 	    more--;
1391 	else
1392 	    cons = "";
1393     }
1394     if (src.null() && cons.null())
1395 	return false;
1396 
1397     const String& traceId = msg[YSTRING("trace_id")];
1398     const char* media = msg.getValue(YSTRING("media"),"audio");
1399     const String& rip = msg[YSTRING("remoteip")];
1400     CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
1401     if (!ch) {
1402 	if (!src.null())
1403 	    TraceDebug(traceId,&splugin,DebugWarn,"RTP source '%s' attach request with no call channel!",src.c_str());
1404 	if (!cons.null())
1405 	    TraceDebug(traceId,&splugin,DebugWarn,"RTP consumer '%s' attach request with no call channel!",cons.c_str());
1406 	return false;
1407     }
1408 
1409     RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
1410     if (!w)
1411 	w = YRTPWrapper::find(msg[YSTRING("rtpid")]);
1412     if (!w) {
1413 	String lip(msg.getValue(YSTRING("localip")));
1414 	bool ipv6 = msg.getBoolValue(YSTRING("ipv6_support"),s_ipv6);
1415 	if (lip.null())
1416 	    YRTPWrapper::guessLocal(rip,lip,ipv6);
1417 	w = new YRTPWrapper(lip,ch,media,RTPSession::SendRecv,msg,false,ipv6);
1418 	w->setMaster(msg.getValue(YSTRING("id")));
1419 
1420 	if (!src.null()) {
1421 	    DataSource* s = w->getSource();
1422 	    ch->setSource(s,media);
1423 	    s->deref();
1424 	}
1425 
1426 	if (!cons.null()) {
1427 	    DataConsumer* c = w->getConsumer();
1428 	    ch->setConsumer(c,media);
1429 	    c->deref();
1430 	}
1431     }
1432 
1433     w->deref();
1434     if (w->refcount() <= 1)
1435 	return false;
1436 
1437     w->setParams(rip,msg);
1438     w->setFaxDivert(msg);
1439     msg.setParam("localip",w->host());
1440     msg.setParam("localport",String(w->port()));
1441     msg.setParam("rtpid",w->id());
1442 
1443     // Stop dispatching if we handled all requested
1444     return !more;
1445 }
1446 
1447 
received(Message & msg)1448 bool RtpHandler::received(Message &msg)
1449 {
1450     bool udptl = false;
1451     const String& traceId = msg[YSTRING("trace_id")];
1452     const String& trans = msg[YSTRING("transport")];
1453     if (trans && !trans.startsWith("RTP/")) {
1454 	if (trans &= "udptl")
1455 	    udptl = true;
1456 	else
1457 	    return false;
1458     }
1459     TraceDebug(traceId,&splugin,DebugAll,"%s message received",(trans ? trans.c_str() : "No-transport"));
1460     bool terminate = msg.getBoolValue(YSTRING("terminate"),false);
1461     const String& dir = msg[YSTRING("direction")];
1462     RTPSession::Direction direction = terminate ? RTPSession::FullStop : RTPSession::SendRecv;
1463     bool d_recv = false;
1464     bool d_send = false;
1465     if (dir == YSTRING("bidir")) {
1466 	d_recv = true;
1467 	d_send = true;
1468     }
1469     else if (dir == YSTRING("receive")) {
1470 	d_recv = true;
1471 	direction = RTPSession::RecvOnly;
1472     }
1473     else if (dir == YSTRING("send")) {
1474 	d_send = true;
1475 	direction = RTPSession::SendOnly;
1476     }
1477 
1478     CallEndpoint* ch = YOBJECT(CallEndpoint,msg.userData());
1479     DataEndpoint* de = YOBJECT(DataEndpoint,msg.userData());
1480     const char* media = udptl ? "image" : "audio";
1481     media = msg.getValue(YSTRING("media"),(de ? de->name().c_str() : media));
1482     RefPointer<YRTPWrapper> w = YRTPWrapper::find(ch,media);
1483     if (w)
1484 	TraceDebug(traceId,&splugin,DebugAll,"Wrapper %p found by CallEndpoint %p",(YRTPWrapper*)w,ch);
1485     else {
1486 	const String& rid = msg[YSTRING("rtpid")];
1487 	w = YRTPWrapper::find(rid);
1488 	if (w)
1489 	    TraceDebug(traceId,&splugin,DebugAll,"Wrapper %p found by ID '%s'",(YRTPWrapper*)w,rid.c_str());
1490     }
1491     if (w)
1492 	w->deref();
1493     if (terminate) {
1494 	if (w) {
1495 	    if (w->host())
1496 		msg.setParam("localip",w->host());
1497 	    if (w->port())
1498 		msg.setParam("localport",String(w->port()));
1499 	    w->terminate(msg);
1500 	    msg.setParam("status","terminated");
1501 	    return true;
1502 	}
1503 	return false;
1504     }
1505     if (!(ch || de || w)) {
1506 	TraceDebug(traceId,&splugin,DebugWarn,"Neither call channel nor RTP wrapper found!");
1507 	return false;
1508     }
1509 
1510     const String& rip = msg[YSTRING("remoteip")];
1511     const char* status = "updated";
1512 
1513     if (!w) {
1514 	// it would be pointless to create an unreferenced wrapper
1515 	if (!(d_recv || d_send))
1516 	    return false;
1517 	String lip(msg.getValue(YSTRING("localip")));
1518 	bool ipv6 = msg.getBoolValue(YSTRING("ipv6_support"),s_ipv6);
1519 	if (lip.null())
1520 	    YRTPWrapper::guessLocal(rip,lip,ipv6,traceId);
1521 	if (lip.null()) {
1522 	    TraceDebug(traceId,&splugin,DebugWarn,"RTP request with no local address!");
1523 	    return false;
1524 	}
1525 
1526 	status = "created";
1527 	w = new YRTPWrapper(lip,ch,media,direction,msg,udptl,ipv6);
1528 	w->setMaster(msg.getValue(YSTRING("id")));
1529 
1530 	w->deref();
1531     }
1532     else if (w->valid())
1533 	w->addDirection(direction);
1534     else
1535 	return false;
1536 
1537     if (d_recv) {
1538 	if (ch && !ch->getSource(media)) {
1539 	    DataSource* s = w->getSource();
1540 	    ch->setSource(s,media);
1541 	    s->deref();
1542 	}
1543 	else if (de && !de->getSource()) {
1544 	    DataSource* s = w->getSource();
1545 	    de->setSource(s);
1546 	    s->deref();
1547 	}
1548     }
1549 
1550     if (d_send) {
1551 	if (ch && !ch->getConsumer(media)) {
1552 	    DataConsumer* c = w->getConsumer();
1553 	    ch->setConsumer(c,media);
1554 	    c->deref();
1555 	}
1556 	else if (de && !de->getConsumer()) {
1557 	    DataConsumer* c = w->getConsumer();
1558 	    de->setConsumer(c);
1559 	    c->deref();
1560 	}
1561     }
1562 
1563     if (w->refcount() <= 1)
1564 	return false;
1565 
1566     w->setParams(rip,msg);
1567     w->setFaxDivert(msg);
1568     msg.setParam("localip",w->host());
1569     msg.setParam("localport",String(w->port()));
1570     msg.setParam("rtpid",w->id());
1571     msg.setParam("status",status);
1572 
1573     if (msg.getBoolValue(YSTRING("getsession"),!msg.userData()))
1574 	msg.userData(w);
1575     return true;
1576 }
1577 
1578 
received(Message & msg)1579 bool DTMFHandler::received(Message &msg)
1580 {
1581     const String& targetid = msg[YSTRING("targetid")];
1582     if (targetid.null())
1583 	return false;
1584     const String& text = msg[YSTRING("text")];
1585     if (text.null())
1586 	return false;
1587     RefPointer<YRTPWrapper> wrap = YRTPWrapper::find(targetid);
1588     if (!wrap)
1589 	return false;
1590     wrap->deref();
1591     if (wrap->rtp()) {
1592 	Debug(&splugin,DebugInfo,"RTP DTMF '%s' targetid '%s'",text.c_str(),targetid.c_str());
1593 	int duration = msg.getIntValue(YSTRING("duration"));
1594 	for (unsigned int i=0;i<text.length();i++)
1595 	    wrap->sendDTMF(text.at(i),duration);
1596 	return true;
1597     }
1598     return false;
1599 }
1600 
1601 
YRTPPlugin()1602 YRTPPlugin::YRTPPlugin()
1603     : Module("yrtp","misc"), m_first(true)
1604 {
1605     Output("Loaded module YRTP");
1606 }
1607 
~YRTPPlugin()1608 YRTPPlugin::~YRTPPlugin()
1609 {
1610     Output("Unloading module YRTP");
1611     s_calls.clear();
1612     s_mirrors.clear();
1613 }
1614 
genUpdate(Message & msg)1615 void YRTPPlugin::genUpdate(Message& msg)
1616 {
1617     s_mutex.lock();
1618     msg.setParam("chans",String(s_calls.count()));
1619     s_mutex.unlock();
1620     s_refMutex.lock();
1621     msg.setParam("mirrors",String(s_mirrors.count()));
1622     s_refMutex.unlock();
1623 }
1624 
statusParams(String & str)1625 void YRTPPlugin::statusParams(String& str)
1626 {
1627     s_mutex.lock();
1628     str.append("chans=",",") << s_calls.count();
1629     s_mutex.unlock();
1630     s_refMutex.lock();
1631     str.append("mirrors=",",") << s_mirrors.count();
1632     s_refMutex.unlock();
1633 }
1634 
statusDetail(String & str)1635 void YRTPPlugin::statusDetail(String& str)
1636 {
1637     s_mutex.lock();
1638     ObjList* l = s_calls.skipNull();
1639     for (; l; l=l->skipNext()) {
1640 	YRTPWrapper* w = static_cast<YRTPWrapper*>(l->get());
1641         str.append(w->id(),",") << "=" << w->callId();
1642     }
1643     s_mutex.unlock();
1644     s_refMutex.lock();
1645     for (l = s_mirrors.skipNull(); l; l=l->skipNext()) {
1646 	YRTPReflector* r = static_cast<YRTPReflector*>(l->get());
1647         str.append(r->idA(),",") << "=" << r->idB().safe("?");
1648     }
1649     s_refMutex.unlock();
1650 }
1651 
1652 static Regexp s_reflectMatch(
1653     "^\\(.*o=[^[:cntrl:]]\\+ IN IP4 \\)"
1654     "\\([0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\)"
1655     "\\([[:cntrl:]].*c=IN IP4 \\)"
1656     "\\([0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\.[0-9]\\+\\)"
1657     "\\([[:cntrl:]].*m=audio \\)"
1658     "\\([0-9]\\+\\)"
1659     "\\( RTP/.*\\)$"
1660 );
1661 
reflectSetup(Message & msg,const char * id,RTPTransport & rtp,const char * rHost,const char * leg)1662 bool YRTPPlugin::reflectSetup(Message& msg, const char* id, RTPTransport& rtp,
1663     const char* rHost, const char* leg)
1664 {
1665     String lip(msg.getValue(YSTRING("rtp_localip")));
1666     if (lip.null())
1667 	YRTPWrapper::guessLocal(rHost,lip,false);
1668     SocketAddr addr(AF_INET);
1669     if (!addr.host(lip)) {
1670 	Debug(this,DebugWarn,"Bad local RTP address '%s' for %s '%s'",
1671 	    lip.c_str(),leg,id);
1672 	return false;
1673     }
1674 
1675     int minport = msg.getIntValue(YSTRING("rtp_minport"),s_minport);
1676     int maxport = msg.getIntValue(YSTRING("rtp_maxport"),s_maxport);
1677     int attempt = 10;
1678     if (minport > maxport) {
1679 	int tmp = maxport;
1680 	maxport = minport;
1681 	minport = tmp;
1682     }
1683     else if (minport == maxport) {
1684 	maxport++;
1685 	attempt = 1;
1686     }
1687     bool rtcp = msg.getBoolValue(YSTRING("rtp_rtcp"),s_rtcp);
1688     for (;;) {
1689 	int lport = (minport + (Random::random() % (maxport - minport))) & 0xfffe;
1690 	addr.port(lport);
1691 	if (rtp.localAddr(addr,rtcp)) {
1692 	    Debug(this,DebugInfo,"Reflector %s for '%s' bound to %s:%u%s",
1693 		leg,id,lip.c_str(),lport,(rtcp ? " +RTCP" : ""));
1694 	    break;
1695 	}
1696 	if (--attempt <= 0) {
1697 	    Debug(this,DebugWarn,"Could not bind reflector %s for '%s' in range %d - %d",
1698 		leg,id,minport,maxport);
1699 	    return false;
1700 	}
1701     }
1702     return true;
1703 }
1704 
reflectStart(Message & msg,const char * id,RTPTransport & rtp,SocketAddr & rAddr)1705 bool YRTPPlugin::reflectStart(Message& msg, const char* id, RTPTransport& rtp,
1706     SocketAddr& rAddr)
1707 {
1708     if (!rtp.remoteAddr(rAddr,msg.getBoolValue(YSTRING("rtp_autoaddr"),s_autoaddr))) {
1709 	Debug(this,DebugWarn,"Could not set remote RTP address for '%s'",id);
1710 	return false;
1711     }
1712     if (msg.getBoolValue(YSTRING("rtp_drillhole"),s_drill)) {
1713 	bool ok = rtp.drillHole();
1714 	Debug(this,(ok ? DebugInfo : DebugWarn),
1715 	    "Reflector for '%s' %s a hole in firewall/NAT",
1716 	    id,(ok ? "opened" : "failed to open"));
1717     }
1718     return true;
1719 }
1720 
reflectDrop(YRTPReflector * & refl,Lock & mylock)1721 void YRTPPlugin::reflectDrop(YRTPReflector*& refl, Lock& mylock)
1722 {
1723     s_mirrors.remove(refl,false);
1724     mylock.drop();
1725     Message* m = new Message("call.drop");
1726     m->addParam("id",refl->idA());
1727     m->addParam("reason","nomedia");
1728     TelEngine::destruct(refl);
1729     Engine::enqueue(m);
1730 }
1731 
reflectExecute(Message & msg)1732 void YRTPPlugin::reflectExecute(Message& msg)
1733 {
1734     const String* id = msg.getParam(YSTRING("id"));
1735     if (null(id))
1736 	return;
1737     String* sdp = msg.getParam(YSTRING("sdp_raw"));
1738     if (null(sdp))
1739 	return;
1740     if (!(msg.getBoolValue(YSTRING("rtp_forward"),false) && msg.getBoolValue(YSTRING("rtp_reflect"),false)))
1741 	return;
1742     DDebug(this,DebugAll,"YRTPPlugin::reflectExecute() A='%s'",id->c_str());
1743     // we have a candidate
1744     if (!sdp->matches(s_reflectMatch)) {
1745 	Debug(this,DebugWarn,"Unable to match SDP to reflect RTP for '%s'",id->c_str());
1746 	return;
1747     }
1748     SocketAddr ra(AF_INET);
1749     if (!(ra.host(sdp->matchString(4)) && ra.port(sdp->matchString(6).toInteger(-1)) && ra.valid())) {
1750 	Debug(this,DebugWarn,"Invalid RTP transport address for '%s'",id->c_str());
1751 	return;
1752     }
1753     const char* aHost = msg.getValue(YSTRING("rtp_addr"),ra.host().c_str());
1754     const char* bHost = msg.getValue(YSTRING("rtp_remoteip"),aHost);
1755     YRTPReflector* r = new YRTPReflector(*id,
1756 	(sdp->find("a=recvonly") >= 0),(sdp->find("a=sendonly") >= 0));
1757     if (!(reflectSetup(msg,id->c_str(),r->rtpA(),aHost,"A") &&
1758 	reflectStart(msg,id->c_str(),r->rtpA(),ra) &&
1759 	reflectSetup(msg,id->c_str(),r->rtpB(),bHost,"B"))) {
1760 	TelEngine::destruct(r);
1761 	return;
1762     }
1763     String templ;
1764     templ << "\\1" << r->rtpB().localAddr().host();
1765     templ << "\\3" << r->rtpB().localAddr().host();
1766     templ << "\\5" << r->rtpB().localAddr().port() << "\\7";
1767     *sdp = sdp->replaceMatches(templ);
1768     s_refMutex.lock();
1769     s_mirrors.append(r);
1770     s_refMutex.unlock();
1771     s_mutex.lock();
1772     changed();
1773     s_mutex.unlock();
1774 }
1775 
reflectAnswer(Message & msg,bool ignore)1776 void YRTPPlugin::reflectAnswer(Message& msg, bool ignore)
1777 {
1778     const String* peerid = msg.getParam(YSTRING("peerid"));
1779     if (null(peerid))
1780 	return;
1781     YRTPReflector* r = 0;
1782     Lock mylock(s_refMutex);
1783     ObjList* l = s_mirrors.skipNull();
1784     for (; l; l=l->skipNext()) {
1785 	r = static_cast<YRTPReflector*>(l->get());
1786 	if (r->idA() == *peerid)
1787 	    break;
1788 	r = 0;
1789     }
1790     if (!r)
1791 	return;
1792     DDebug(this,DebugAll,"YRTPPlugin::reflectAnswer() A='%s'",peerid->c_str());
1793     const String* id = msg.getParam(YSTRING("id"));
1794     if (null(id)) {
1795 	if (ignore)
1796 	    return;
1797 	Debug(this,DebugWarn,"Peer of RTP reflection '%s' answered without ID",peerid->c_str());
1798 	reflectDrop(r,mylock);
1799 	return;
1800     }
1801     if (r->idB() && (r->idB() != *id)) {
1802 	Debug(this,DebugWarn,"Reflect target of '%s' changed from '%s' to '%s'",
1803 	    peerid->c_str(),r->idB().c_str(),id->c_str());
1804 	reflectDrop(r,mylock);
1805 	return;
1806     }
1807     String* sdp = msg.getParam(YSTRING("sdp_raw"));
1808     if (null(sdp) || !msg.getBoolValue(YSTRING("rtp_forward"),false)) {
1809 	if (ignore)
1810 	    return;
1811 	Debug(this,DebugWarn,"Unable to complete RTP reflection for '%s'",peerid->c_str());
1812 	reflectDrop(r,mylock);
1813 	return;
1814     }
1815     if (!sdp->matches(s_reflectMatch)) {
1816 	if (ignore)
1817 	    return;
1818 	Debug(this,DebugWarn,"Unable to match SDP to reflect RTP for '%s'",id->c_str());
1819 	reflectDrop(r,mylock);
1820 	return;
1821     }
1822     if (r->idB().null())
1823 	r->setB(*id);
1824     SocketAddr ra(AF_INET);
1825     if (!(ra.host(sdp->matchString(4)) && ra.port(sdp->matchString(6).toInteger(-1)) && ra.valid())) {
1826 	Debug(this,DebugWarn,"Invalid RTP transport address for '%s'",id->c_str());
1827 	reflectDrop(r,mylock);
1828 	return;
1829     }
1830     if (!reflectStart(msg,id->c_str(),r->rtpB(),ra)) {
1831 	reflectDrop(r,mylock);
1832 	return;
1833     }
1834     r->monA().startup();
1835     r->monB().startup();
1836     String templ;
1837     templ << "\\1" << r->rtpA().localAddr().host();
1838     templ << "\\3" << r->rtpA().localAddr().host();
1839     templ << "\\5" << r->rtpA().localAddr().port() << "\\7";
1840     *sdp = sdp->replaceMatches(templ);
1841 }
1842 
reflectHangup(Message & msg)1843 void YRTPPlugin::reflectHangup(Message& msg)
1844 {
1845     const String* id = msg.getParam(YSTRING("id"));
1846     if (null(id))
1847 	return;
1848     Lock mylock(s_refMutex);
1849     ObjList* l = s_mirrors.skipNull();
1850     for (; l; l=l->skipNext()) {
1851 	YRTPReflector* r = static_cast<YRTPReflector*>(l->get());
1852 	if (r->idA() == *id) {
1853 	    DDebug(this,DebugAll,"YRTPPlugin::reflectHangup() A='%s' B='%s'",
1854 		id->c_str(),r->idB().c_str());
1855 	    r->setA(String::empty());
1856 	    r->monA().saveStats(msg);
1857 	    if (r->idB())
1858 		return;
1859 	}
1860 	else if (r->idB() == *id) {
1861 	    DDebug(this,DebugAll,"YRTPPlugin::reflectHangup() B='%s' A='%s'",
1862 		id->c_str(),r->idA().c_str());
1863 	    r->setB(String::empty());
1864 	    r->monB().saveStats(msg);
1865 	    if (r->idA())
1866 		return;
1867 	}
1868 	else
1869 	    continue;
1870 	s_mirrors.remove(r,false);
1871 	mylock.drop();
1872 	TelEngine::destruct(r);
1873 	break;
1874     }
1875 }
1876 
received(Message & msg,int id)1877 bool YRTPPlugin::received(Message& msg, int id)
1878 {
1879     switch (id) {
1880 	case Execute:
1881 	    reflectExecute(msg);
1882 	    return false;
1883 	case Ringing:
1884 	case Progress:
1885 	    reflectAnswer(msg,true);
1886 	    return false;
1887 	case Answered:
1888 	    reflectAnswer(msg,false);
1889 	    return false;
1890 	case Private:
1891 	    reflectHangup(msg);
1892 	    return false;
1893 	default:
1894 	    return Module::received(msg,id);
1895     }
1896 }
1897 
initialize()1898 void YRTPPlugin::initialize()
1899 {
1900     Output("Initializing module YRTP");
1901     Configuration cfg(Engine::configFile("yrtpchan"));
1902     s_ipv6 = SocketAddr::supports(SocketAddr::IPv6) &&
1903 	cfg.getBoolValue("general","ipv6_support",false);
1904     s_minport = cfg.getIntValue("general","minport",MIN_PORT);
1905     s_maxport = cfg.getIntValue("general","maxport",MAX_PORT);
1906     s_bufsize = cfg.getIntValue("general","buffer",BUF_SIZE);
1907     s_minJitter = cfg.getIntValue("general","minjitter",50);
1908     s_maxJitter = cfg.getIntValue("general","maxjitter",Engine::clientMode() ? 120 : 0);
1909     s_tos = cfg.getIntValue("general","tos",Socket::tosValues());
1910     s_udpbuf = cfg.getIntValue("general","udpbuf",0);
1911     s_localip = cfg.getValue("general","localip");
1912     s_autoaddr = cfg.getBoolValue("general","autoaddr",true);
1913     s_anyssrc = cfg.getBoolValue("general","anyssrc",true);
1914     s_padding = cfg.getIntValue("general","padding",0);
1915     s_rtcp = cfg.getBoolValue("general","rtcp",true);
1916     s_interval = cfg.getIntValue("general","rtcp_interval",4500);
1917     s_drill = cfg.getBoolValue("general","drillhole",Engine::clientMode());
1918     s_monitor = cfg.getBoolValue("general","monitoring",false);
1919     s_sleep = cfg.getIntValue("general","defsleep",5);
1920     RTPGroup::setMinSleep(cfg.getIntValue("general","minsleep"));
1921     s_priority = Thread::priority(cfg.getValue("general","thread"));
1922     s_affinity = cfg.getValue("general","affinity");
1923     s_rtpWarnSeq = cfg.getBoolValue("general","rtp_warn_seq",true);
1924     s_timeout = cfg.getIntValue("timeouts","timeout",3000);
1925     s_udptlTimeout = cfg.getIntValue("timeouts","udptl_timeout",25000);
1926     s_notifyMsg = cfg.getValue("timeouts","notifymsg");
1927     s_warnFirst = cfg.getBoolValue("timeouts","warnfirst",true);
1928     s_warnLater = cfg.getBoolValue("timeouts","warnlater",false);
1929     setup();
1930     if (m_first) {
1931 	m_first = false;
1932 	installRelay(Execute,50);
1933 	installRelay(Ringing,50);
1934 	installRelay(Progress,50);
1935 	installRelay(Answered,50);
1936 	installRelay(Private,"chan.hangup",50);
1937 	Engine::install(new AttachHandler);
1938 	Engine::install(new RtpHandler);
1939 	Engine::install(new DTMFHandler);
1940     }
1941 }
1942 
1943 }; // anonymous namespace
1944 
1945 /* vi: set ts=8 sw=4 sts=4 noet: */
1946