1 /**
2  * Channel.cpp
3  * This file is part of the YATE Project http://YATE.null.ro
4  *
5  * Yet Another Telephony Engine - a fully featured software PBX and IVR
6  * Copyright (C) 2004-2014 Null Team
7  *
8  * This software is distributed under multiple licenses;
9  * see the COPYING file in the main directory for licensing
10  * information for this specific distribution.
11  *
12  * This use of this software may be subject to additional restrictions.
13  * See the LEGAL file in the main directory for details.
14  *
15  * This program is distributed in the hope that it will be useful,
16  * but WITHOUT ANY WARRANTY; without even the implied warranty of
17  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18  */
19 
20 #include "yatephone.h"
21 
22 #include <string.h>
23 #include <stdlib.h>
24 
25 using namespace TelEngine;
26 
27 // Find if a string appears to be an E164 phone number
isE164(const char * str)28 bool TelEngine::isE164(const char* str)
29 {
30     if (!str)
31 	return false;
32     // an initial + character is ok, we skip it
33     if (*str == '+')
34 	str++;
35     // at least one valid character is required
36     if (!*str)
37 	return false;
38     for (;;) {
39 	switch (*str++) {
40 	    case '0':
41 	    case '1':
42 	    case '2':
43 	    case '3':
44 	    case '4':
45 	    case '5':
46 	    case '6':
47 	    case '7':
48 	    case '8':
49 	    case '9':
50 	    case '*':
51 	    case '#':
52 		break;
53 	    case '\0':
54 		return true;
55 	    default:
56 		return false;
57 	}
58     }
59 }
60 
61 static unsigned int s_callid = 0;
62 static Mutex s_callidMutex(false,"CallID");
63 
64 // this is to protect against two threads trying to (dis)connect a pair
65 //  of call endpoints at the same time
66 static Mutex s_mutex(true,"CallEndpoint");
67 static Mutex s_lastMutex(false,"CallEndpoint::last");
68 static const String s_audioType = "audio";
69 static const String s_copyParams = "copyparams";
70 
71 // Check if a Lock taken on the common mutex succeeded, wait up to 55s more in congestion
checkRetry(Lock & lock)72 static bool checkRetry(Lock& lock)
73 {
74     if (lock.locked())
75 	return true;
76     Engine::setCongestion("Call endpoint mutex busy");
77     bool ok = lock.acquire(s_mutex,55000000);
78     Engine::setCongestion();
79     return ok;
80 }
81 
82 
CallEndpoint(const char * id)83 CallEndpoint::CallEndpoint(const char* id)
84     : m_peer(0), m_lastPeer(0), m_id(id), m_mutex(0)
85 {
86 }
87 
destroyed()88 void CallEndpoint::destroyed()
89 {
90 #ifdef DEBUG
91     ObjList* l = m_data.skipNull();
92     for (; l; l=l->skipNext()) {
93 	DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
94 	Debug(DebugAll,"Endpoint at %p type '%s' refcount=%d",e,e->name().c_str(),e->refcount());
95     }
96 #endif
97     disconnect(true,0,true,0);
98     clearEndpoint();
99     m_lastPeer = 0;
100 }
101 
commonMutex()102 Mutex& CallEndpoint::commonMutex()
103 {
104     return s_mutex;
105 }
106 
getObject(const String & name) const107 void* CallEndpoint::getObject(const String& name) const
108 {
109     if (name == YATOM("CallEndpoint"))
110 	return const_cast<CallEndpoint*>(this);
111     return RefObject::getObject(name);
112 }
113 
setId(const char * newId)114 void CallEndpoint::setId(const char* newId)
115 {
116     m_id = newId;
117 }
118 
connect(CallEndpoint * peer,const char * reason,bool notify)119 bool CallEndpoint::connect(CallEndpoint* peer, const char* reason, bool notify)
120 {
121     if (!peer) {
122 	disconnect(reason,notify);
123 	return false;
124     }
125     if (peer == m_peer)
126 	return true;
127     if (peer == this) {
128 	TraceDebug(traceId(),DebugWarn,"CallEndpoint '%s' trying to connect to itself! [%p]",m_id.c_str(),this);
129 	return false;
130     }
131     DDebug(DebugAll,"CallEndpoint '%s' connecting peer %p to [%p]",m_id.c_str(),peer,this);
132 
133 #if 0
134     Lock lock(s_mutex,5000000);
135     if (!checkRetry(lock)) {
136 	Alarm("engine","bug",DebugFail,"Call connect failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
137 	Engine::restart(0);
138 	return false;
139     }
140 #endif
141 
142     // are we already dead?
143     if (!ref())
144 	return false;
145     disconnect(reason,notify);
146     // is our intended peer dead?
147     if (!peer->ref()) {
148 	deref();
149 	return false;
150     }
151     peer->disconnect(reason,notify);
152 
153     ObjList* l = m_data.skipNull();
154     for (; l; l=l->skipNext()) {
155 	DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
156 	e->connect(peer->getEndpoint(e->name()));
157     }
158 
159     m_peer = peer;
160     peer->setPeer(this,reason,notify);
161     setDisconnect(0);
162     connected(reason);
163 
164     return true;
165 }
166 
disconnect(bool final,const char * reason,bool notify,const NamedList * params)167 bool CallEndpoint::disconnect(bool final, const char* reason, bool notify, const NamedList* params)
168 {
169     if (!m_peer)
170 	return false;
171     DDebug(DebugAll,"CallEndpoint '%s' disconnecting peer %p from [%p]",m_id.c_str(),m_peer,this);
172 
173     Lock lock(s_mutex,5000000);
174     if (!checkRetry(lock)) {
175 	TraceAlarm(traceId(),"engine","bug",DebugFail,"Call disconnect failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
176 	Engine::restart(0);
177 	return false;
178     }
179 
180     CallEndpoint *temp = m_peer;
181     m_peer = 0;
182     m_lastPeer = 0;
183     if (!temp)
184 	return false;
185 
186     ObjList* l = m_data.skipNull();
187     for (; l; l=l->skipNext()) {
188 	DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
189 	DDebug(DebugAll,"Endpoint at %p type '%s' peer %p",e,e->name().c_str(),e->getPeer());
190 	e->disconnect();
191     }
192 
193     temp->setPeer(0,reason,notify,params);
194     bool dead = !alive();
195     if (dead)
196 	TraceDebug(traceId(),DebugMild,"CallEndpoint '%s' disconnect called while dead [%p]",m_id.c_str(),this);
197     if (final)
198 	disconnected(true,reason);
199     lock.drop();
200     temp->deref();
201     return dead || deref();
202 }
203 
setPeer(CallEndpoint * peer,const char * reason,bool notify,const NamedList * params)204 void CallEndpoint::setPeer(CallEndpoint* peer, const char* reason, bool notify, const NamedList* params)
205 {
206     m_peer = peer;
207     if (m_peer) {
208 	setDisconnect(0);
209 	connected(reason);
210     }
211     else {
212 	m_lastPeer = 0;
213 	if (notify) {
214 	    setDisconnect(params);
215 	    disconnected(false,reason);
216 	}
217     }
218 }
219 
getPeerId(String & id) const220 bool CallEndpoint::getPeerId(String& id) const
221 {
222     id.clear();
223     if (!m_peer)
224 	return false;
225     if (m_peer == m_lastPeer) {
226 	Lock mylock(s_lastMutex);
227 	if (m_peer == m_lastPeer) {
228 	    id = m_lastPeerId;
229 	    return !id.null();
230 	}
231     }
232     Lock lock(s_mutex,5000000);
233     if (!checkRetry(lock)) {
234 	TraceAlarm(traceId(),"engine","bug",DebugFail,"Peer ID failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
235 	Engine::restart(0);
236 	return false;
237     }
238     if (m_peer) {
239 	id = m_peer->id();
240 	return true;
241     }
242     else
243 	return false;
244 }
245 
getPeerId() const246 String CallEndpoint::getPeerId() const
247 {
248     String id;
249     getPeerId(id);
250     return id;
251 }
252 
getLastPeerId(String & id) const253 bool CallEndpoint::getLastPeerId(String& id) const
254 {
255     id.clear();
256     if (m_lastPeerId.null())
257 	return false;
258     s_lastMutex.lock();
259     id = m_lastPeerId;
260     s_lastMutex.unlock();
261     return !id.null();
262 }
263 
setLastPeerId()264 void CallEndpoint::setLastPeerId()
265 {
266     if (!m_peer)
267 	return;
268     if (m_peer == m_lastPeer)
269 	return;
270     Lock lock(s_mutex,5000000);
271     if (!checkRetry(lock)) {
272 	TraceAlarm(traceId(),"engine","bug",DebugCrit,"Set last peer ID failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
273 	return;
274     }
275     if (m_peer) {
276 	s_lastMutex.lock();
277 	m_lastPeer = m_peer;
278 	m_lastPeerId = m_peer->id();
279 	s_lastMutex.unlock();
280     }
281 }
282 
getEndpoint(const String & type) const283 DataEndpoint* CallEndpoint::getEndpoint(const String& type) const
284 {
285     if (type.null())
286 	return 0;
287     const ObjList* pos = m_data.find(type);
288     return pos ? static_cast<DataEndpoint*>(pos->get()) : 0;
289 }
290 
setEndpoint(const String & type)291 DataEndpoint* CallEndpoint::setEndpoint(const String& type)
292 {
293     if (type.null() || (refcount() <= 0))
294 	return 0;
295     DataEndpoint* dat = getEndpoint(type);
296     if (!dat) {
297 	dat = new DataEndpoint(this,type);
298 	if (m_peer)
299 	    dat->connect(m_peer->getEndpoint(type));
300     }
301     return dat;
302 }
303 
setEndpoint(DataEndpoint * endPoint)304 void CallEndpoint::setEndpoint(DataEndpoint* endPoint)
305 {
306     if ((refcount() <= 0) || !(endPoint && endPoint->ref()))
307 	return;
308     if (m_data.find(endPoint)) {
309 	endPoint->deref();
310 	return;
311     }
312     clearEndpoint(endPoint->toString());
313     endPoint->disconnect();
314     m_data.append(endPoint);
315     if (m_peer)
316 	endPoint->connect(m_peer->getEndpoint(endPoint->toString()));
317 }
318 
clearEndpoint(const String & type)319 void CallEndpoint::clearEndpoint(const String& type)
320 {
321     if (type.null()) {
322 	ObjList* l = m_data.skipNull();
323 	for (; l; l=l->skipNext()) {
324 	    DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
325 	    DDebug(DebugAll,"Endpoint at %p type '%s' peer %p",e,e->name().c_str(),e->getPeer());
326 	    e->disconnect();
327 	    e->clearCall(this);
328 	}
329 	m_data.clear();
330     }
331     else {
332 	DataEndpoint* dat = getEndpoint(type);
333 	if (dat) {
334 	    m_data.remove(dat,false);
335 	    dat->disconnect();
336 	    dat->clearCall(this);
337 	    dat->destruct();
338 	}
339     }
340 }
341 
setSource(DataSource * source,const String & type)342 void CallEndpoint::setSource(DataSource* source, const String& type)
343 {
344     DataEndpoint* dat = source ? setEndpoint(type) : getEndpoint(type);
345     if (RefObject::alive(dat))
346 	dat->setSource(source);
347 }
348 
getSource(const String & type) const349 DataSource* CallEndpoint::getSource(const String& type) const
350 {
351     DataEndpoint* dat = getEndpoint(type);
352     return RefObject::alive(dat) ? dat->getSource() : 0;
353 }
354 
setConsumer(DataConsumer * consumer,const String & type)355 void CallEndpoint::setConsumer(DataConsumer* consumer, const String& type)
356 {
357     DataEndpoint* dat = consumer ? setEndpoint(type) : getEndpoint(type);
358     if (RefObject::alive(dat))
359 	dat->setConsumer(consumer);
360 }
361 
getConsumer(const String & type) const362 DataConsumer* CallEndpoint::getConsumer(const String& type) const
363 {
364     DataEndpoint* dat = getEndpoint(type);
365     return RefObject::alive(dat) ? dat->getConsumer() : 0;
366 }
367 
clearData(DataNode * node,const String & type)368 bool CallEndpoint::clearData(DataNode* node, const String& type)
369 {
370     if (type.null() || !node)
371 	return false;
372     Lock mylock(DataEndpoint::commonMutex());
373     RefPointer<DataEndpoint> dat = getEndpoint(type);
374     return dat && dat->clearData(node);
375 }
376 
audioType()377 const String& CallEndpoint::audioType()
378 {
379     return s_audioType;
380 }
381 
382 
383 static const String s_disconnected("chan.disconnected");
384 
385 // Mutex used to lock disconnect parameters during access
386 static Mutex s_paramMutex(true,"ChannelParams");
387 
388 // Mutex used to protect channel data
389 Mutex Channel::s_chanDataMutex(false,"ChannelData");
390 
Channel(Driver * driver,const char * id,bool outgoing)391 Channel::Channel(Driver* driver, const char* id, bool outgoing)
392     : CallEndpoint(id),
393       m_parameters(""), m_chanParams(0), m_driver(driver), m_outgoing(outgoing),
394       m_timeout(0), m_maxcall(0), m_maxPDD(0), m_dtmfTime(0),
395       m_toutAns(0), m_dtmfSeq(0), m_answered(false)
396 {
397     init();
398 }
399 
Channel(Driver & driver,const char * id,bool outgoing)400 Channel::Channel(Driver& driver, const char* id, bool outgoing)
401     : CallEndpoint(id),
402       m_parameters(""), m_chanParams(0), m_driver(&driver), m_outgoing(outgoing),
403       m_timeout(0), m_maxcall(0), m_maxPDD(0), m_dtmfTime(0),
404       m_toutAns(0), m_dtmfSeq(0), m_answered(false)
405 {
406     init();
407 }
408 
~Channel()409 Channel::~Channel()
410 {
411 #ifdef DEBUG
412     Debugger debug(DebugAll,"Channel::~Channel()"," '%s' [%p]",id().c_str(),this);
413 #endif
414     cleanup();
415     TelEngine::destruct(m_chanParams);
416 }
417 
getObject(const String & name) const418 void* Channel::getObject(const String& name) const
419 {
420     if (name == YATOM("Channel"))
421 	return const_cast<Channel*>(this);
422     if (name == YATOM("MessageNotifier"))
423 	return static_cast<MessageNotifier*>(const_cast<Channel*>(this));
424     return CallEndpoint::getObject(name);
425 }
426 
paramMutex()427 Mutex& Channel::paramMutex()
428 {
429     return s_paramMutex;
430 }
431 
init()432 void Channel::init()
433 {
434     status(direction());
435     m_mutex = m_driver;
436     if (m_driver) {
437 	m_driver->lock();
438 	debugName(m_driver->debugName());
439 	debugChain(m_driver);
440 	if (id().null()) {
441 	    String tmp(m_driver->prefix());
442 	    tmp << m_driver->nextid();
443 	    setId(tmp);
444 	}
445 	m_driver->unlock();
446     }
447     // assign a new billid only to incoming calls
448     if (m_billid.null() && !m_outgoing)
449 	m_billid << Engine::runId() << "-" << allocId();
450     DDebug(this,DebugInfo,"Channel::init() '%s' [%p]",id().c_str(),this);
451 }
452 
cleanup()453 void Channel::cleanup()
454 {
455     m_timeout = 0;
456     m_maxcall = 0;
457     m_maxPDD = 0;
458     status("deleted");
459     m_targetid.clear();
460     dropChan();
461     m_driver = 0;
462     m_mutex = 0;
463 }
464 
filterDebug(const String & item)465 void Channel::filterDebug(const String& item)
466 {
467     if (m_driver) {
468 	if (m_driver->filterInstalled())
469 	    debugEnabled(m_driver->filterDebug(item));
470 	else
471 	    debugChain(m_driver);
472     }
473 }
474 
initChan()475 void Channel::initChan()
476 {
477     if (!m_driver)
478 	return;
479     Lock mylock(m_driver);
480 #ifndef NDEBUG
481     if (m_driver->channels().find(this)) {
482 	Debug(DebugCrit,"Channel '%s' already in list of '%s' driver [%p]",
483 	    id().c_str(),m_driver->name().c_str(),this);
484 	return;
485     }
486 #endif
487     m_driver->m_total++;
488     m_driver->m_chanCount++;
489     m_driver->channels().append(this);
490     m_driver->changed();
491 }
492 
dropChan()493 void Channel::dropChan()
494 {
495     if (!m_driver)
496 	return;
497     m_driver->lock();
498     if (!m_driver)
499 	TraceDebug(traceId(),DebugFail,"Driver lost in dropChan! [%p]",this);
500     if (m_driver->channels().remove(this,false)) {
501 	if (m_driver->m_chanCount > 0)
502 	    m_driver->m_chanCount--;
503 	m_driver->changed();
504     }
505     m_driver->unlock();
506 }
507 
zeroRefs()508 void Channel::zeroRefs()
509 {
510     // remove us from driver's list before calling the destructor
511     dropChan();
512     CallEndpoint::zeroRefs();
513 }
514 
connected(const char * reason)515 void Channel::connected(const char* reason)
516 {
517     CallEndpoint::connected(reason);
518     if (m_billid.null()) {
519 	Channel* peer = YOBJECT(Channel,getPeer());
520 	if (peer && peer->billid())
521 	    m_billid = peer->billid();
522     }
523     Message* m = message("chan.connected",false,true);
524     setLastPeerId();
525     if (reason)
526 	m->setParam("reason",reason);
527     if (!Engine::enqueue(m))
528 	TelEngine::destruct(m);
529 }
530 
disconnected(bool final,const char * reason)531 void Channel::disconnected(bool final, const char* reason)
532 {
533     if (final || Engine::exiting())
534 	return;
535     // last chance to get reconnected to something
536     Message* m = getDisconnect(reason);
537     s_paramMutex.lock();
538     m_targetid.clear();
539     m_parameters.clearParams();
540     s_paramMutex.unlock();
541     Engine::enqueue(m);
542 }
543 
setDisconnect(const NamedList * params)544 void Channel::setDisconnect(const NamedList* params)
545 {
546     DDebug(this,DebugInfo,"setDisconnect(%p) [%p]",params,this);
547     s_paramMutex.lock();
548     m_parameters.clearParams();
549     if (params)
550 	m_parameters.copyParams(*params);
551     s_paramMutex.unlock();
552 }
553 
endDisconnect(const Message & msg,bool handled)554 void Channel::endDisconnect(const Message& msg, bool handled)
555 {
556 }
557 
dispatched(const Message & msg,bool handled)558 void Channel::dispatched(const Message& msg, bool handled)
559 {
560     if (s_disconnected == msg)
561 	endDisconnect(msg,handled);
562 }
563 
setId(const char * newId)564 void Channel::setId(const char* newId)
565 {
566     debugName(0);
567     CallEndpoint::setId(newId);
568     debugName(id());
569 }
570 
getDisconnect(const char * reason)571 Message* Channel::getDisconnect(const char* reason)
572 {
573     Message* msg = new Message(s_disconnected);
574     s_paramMutex.lock();
575     msg->copyParams(m_parameters);
576     s_paramMutex.unlock();
577     complete(*msg);
578     if (reason)
579 	msg->setParam("reason",reason);
580     // we will remain referenced until the message is destroyed
581     msg->userData(this);
582     msg->setNotify();
583     return msg;
584 }
585 
status(const char * newstat)586 void Channel::status(const char* newstat)
587 {
588     Lock lck(chanDataMutex());
589     m_status = newstat;
590     if (!m_answered && (m_status == YSTRING("answered"))) {
591 	m_answered = true;
592 	// stop pre-answer timeout, restart answered timeout
593 	m_maxcall = 0;
594 	maxPDD(0);
595 	if (m_toutAns)
596 	    timeout(Time::now() + m_toutAns*(u_int64_t)1000);
597     }
598     else if (m_status == YSTRING("ringing") || m_status == YSTRING("progressing"))
599 	maxPDD(0);
600 }
601 
direction() const602 const char* Channel::direction() const
603 {
604     return m_outgoing ? "outgoing" : "incoming";
605 }
606 
setMaxcall(const Message * msg,int defTout)607 void Channel::setMaxcall(const Message* msg, int defTout)
608 {
609     int tout = msg ? msg->getIntValue(YSTRING("timeout"),defTout) : defTout;
610     if (tout > 0) {
611 	m_toutAns = tout;
612 	timeout(Time::now() + tout*(u_int64_t)1000);
613     }
614     else if (tout == 0) {
615 	m_toutAns = 0;
616 	timeout(0);
617     }
618     if (m_answered)
619 	maxcall(0);
620     else if (msg) {
621 	tout = msg->getIntValue(YSTRING("maxcall"),-1);
622 	if (tout > 0) {
623 	    timeout(0);
624 	    maxcall(Time::now() + tout*(u_int64_t)1000);
625 	}
626 	else if (tout == 0)
627 	    maxcall(0);
628     }
629 }
630 
setMaxPDD(const Message & msg)631 void Channel::setMaxPDD(const Message& msg)
632 {
633     if (m_answered) {
634 	maxPDD(0);
635 	return;
636     }
637     int tout = msg.getIntValue(YSTRING("maxpdd"),-1);
638     if (tout > 0)
639 	maxPDD(Time::now() + tout * (u_int64_t)1000);
640     else if (tout == 0)
641 	maxPDD(0);
642 }
643 
complete(Message & msg,bool minimal) const644 void Channel::complete(Message& msg, bool minimal) const
645 {
646     static const String s_hangup("chan.hangup");
647 
648     copyChanParams(msg);
649     msg.setParam("id",id());
650     if (traceId())
651 	msg.setParam("trace_id",traceId());
652     if (m_driver)
653 	msg.setParam("module",m_driver->name());
654     if (s_hangup == msg) {
655 	s_paramMutex.lock();
656 	msg.copyParams(parameters());
657 	s_paramMutex.unlock();
658     }
659 
660     if (minimal)
661 	return;
662 
663     String tmp;
664     if (getStatus(tmp,false))
665 	msg.setParam(YSTRING("status"),tmp);
666     if (m_address)
667 	msg.setParam("address",m_address);
668     if (m_targetid)
669 	msg.setParam("targetid",m_targetid);
670     if (m_billid)
671 	msg.setParam("billid",m_billid);
672     String peer;
673     if (getPeerId(peer))
674 	msg.setParam("peerid",peer);
675     if (getLastPeerId(peer))
676 	msg.setParam("lastpeerid",peer);
677     msg.setParam("answered",String::boolText(m_answered));
678     msg.setParam("direction",direction());
679 }
680 
message(const char * name,bool minimal,bool data)681 Message* Channel::message(const char* name, bool minimal, bool data)
682 {
683     Message* msg = new Message(name);
684     if (data)
685 	msg->userData(this);
686     complete(*msg,minimal);
687     return msg;
688 }
689 
message(const char * name,const NamedList * original,const char * params,bool minimal,bool data)690 Message* Channel::message(const char* name, const NamedList* original, const char* params, bool minimal, bool data)
691 {
692     Message* msg = message(name,minimal,data);
693     if (original) {
694 	if (!params)
695 	    params = original->getValue(s_copyParams);
696 	if (!null(params))
697 	    msg->copyParams(*original,params);
698     }
699     return msg;
700 }
701 
startRouter(Message * msg)702 bool Channel::startRouter(Message* msg)
703 {
704     if (!msg)
705 	return false;
706     if (m_driver) {
707 	Router* r = new Router(m_driver,id(),msg);
708 	if (r->startup())
709 	    return true;
710 	delete r;
711     }
712     else
713 	TelEngine::destruct(msg);
714     callRejected("failure","Internal server error");
715     // dereference and die if the channel is dynamic
716     if (m_driver && m_driver->varchan())
717 	deref();
718     return false;
719 }
720 
msgProgress(Message & msg)721 bool Channel::msgProgress(Message& msg)
722 {
723     status("progressing");
724     if (m_billid.null())
725 	m_billid = msg.getValue(YSTRING("billid"));
726     return true;
727 }
728 
msgRinging(Message & msg)729 bool Channel::msgRinging(Message& msg)
730 {
731     status("ringing");
732     if (m_billid.null())
733 	m_billid = msg.getValue(YSTRING("billid"));
734     return true;
735 }
736 
msgAnswered(Message & msg)737 bool Channel::msgAnswered(Message& msg)
738 {
739     m_maxcall = 0;
740     int tout = msg.getIntValue(YSTRING("timeout"),m_toutAns);
741     m_toutAns = (tout > 0) ? tout : 0;
742     status("answered");
743     m_answered = true;
744     if (m_billid.null())
745 	m_billid = msg.getValue(YSTRING("billid"));
746     return true;
747 }
748 
msgTone(Message & msg,const char * tone)749 bool Channel::msgTone(Message& msg, const char* tone)
750 {
751     return false;
752 }
753 
msgText(Message & msg,const char * text)754 bool Channel::msgText(Message& msg, const char* text)
755 {
756     return false;
757 }
758 
msgDrop(Message & msg,const char * reason)759 bool Channel::msgDrop(Message& msg, const char* reason)
760 {
761     m_timeout = m_maxcall = m_maxPDD = 0;
762     status(null(reason) ? "dropped" : reason);
763     disconnect(reason,msg);
764     return true;
765 }
766 
msgTransfer(Message & msg)767 bool Channel::msgTransfer(Message& msg)
768 {
769     return false;
770 }
771 
msgUpdate(Message & msg)772 bool Channel::msgUpdate(Message& msg)
773 {
774     return false;
775 }
776 
msgMasquerade(Message & msg)777 bool Channel::msgMasquerade(Message& msg)
778 {
779     if (m_billid.null())
780 	m_billid = msg.getValue(YSTRING("billid"));
781     if (msg == YSTRING("call.answered")) {
782 	TraceDebug(traceId(),this,DebugInfo,"Masquerading answer operation [%p]",this);
783 	m_maxcall = 0;
784 	maxPDD(0);
785 	Lock lck(chanDataMutex());
786 	m_status = "answered";
787     }
788     else if (msg == YSTRING("call.progress")) {
789 	TraceDebug(traceId(),this,DebugInfo,"Masquerading progress operation [%p]",this);
790 	status("progressing");
791     }
792     else if (msg == YSTRING("call.ringing")) {
793 	TraceDebug(traceId(),this,DebugInfo,"Masquerading ringing operation [%p]",this);
794 	status("ringing");
795     }
796     else if (msg == YSTRING("chan.dtmf")) {
797 	// add sequence, stop the message if it was a disallowed DTMF duplicate
798 	if (dtmfSequence(msg) && m_driver && !m_driver->m_dtmfDups) {
799 	    TraceDebug(traceId(),this,DebugNote,"Stopping duplicate '%s' DTMF '%s' [%p]",
800 		msg.getValue("detected"),msg.getValue("text"),this);
801 	    return true;
802 	}
803     }
804     return false;
805 }
806 
msgStatus(Message & msg)807 void Channel::msgStatus(Message& msg)
808 {
809     String par;
810     Lock lock(mutex());
811     complete(msg);
812     statusParams(par);
813     lock.drop();
814     msg.retValue().clear();
815     msg.retValue() << "name=" << id() << ",type=channel;" << par << "\r\n";
816 }
817 
818 // Control message handler that is invoked only for messages to this channel
819 // Find a data endpoint to process it
msgControl(Message & msg)820 bool Channel::msgControl(Message& msg)
821 {
822     setMaxcall(msg);
823     setMaxPDD(msg);
824     setChanParams(msg);
825     for (ObjList* o = m_data.skipNull(); o; o = o->skipNext()) {
826 	DataEndpoint* dep = static_cast<DataEndpoint*>(o->get());
827 	if (dep->control(msg))
828 	    return true;
829     }
830     return false;
831 }
832 
statusParams(String & str)833 void Channel::statusParams(String& str)
834 {
835     if (m_driver)
836 	str.append("module=",",") << m_driver->name();
837     String peer;
838     if (getPeerId(peer))
839 	str.append("peerid=",",") << peer;
840     str.append("status=",",");
841     getStatus(str);
842     str << ",direction=" << direction();
843     str << ",answered=" << m_answered;
844     str << ",targetid=" << m_targetid;
845     str << ",address=" << m_address;
846     str << ",billid=" << m_billid;
847     if (m_timeout || m_maxcall || m_maxPDD) {
848 	u_int64_t t = Time::now();
849 	if (m_timeout) {
850 	    str << ",timeout=";
851 	    if (m_timeout > t)
852 		str << (unsigned int)((m_timeout - t + 500) / 1000);
853 	    else
854 		str << "expired";
855 	}
856 	if (m_maxcall) {
857 	    str << ",maxcall=";
858 	    if (m_maxcall > t)
859 		str << (unsigned int)((m_maxcall - t + 500) / 1000);
860 	    else
861 		str << "expired";
862 	}
863 	if (m_maxPDD) {
864 	    str << ",maxpdd=";
865 	    if (m_maxPDD > t)
866 		str << (unsigned int)((m_maxPDD - t + 500) / 1000);
867 	    else
868 		str << "expired";
869 	}
870     }
871 }
872 
checkTimers(Message & msg,const Time & tmr)873 void Channel::checkTimers(Message& msg, const Time& tmr)
874 {
875     if (timeout() && (timeout() < tmr))
876 	msgDrop(msg,"timeout");
877     else if (maxcall() && (maxcall() < tmr))
878 	msgDrop(msg,"noanswer");
879     else if (maxPDD() && (maxPDD() < tmr))
880 	msgDrop(msg,"postdialdelay");
881 }
882 
callPrerouted(Message & msg,bool handled)883 bool Channel::callPrerouted(Message& msg, bool handled)
884 {
885     status("prerouted");
886     // accept a new billid at this stage
887     String* str = msg.getParam(YSTRING("billid"));
888     if (str)
889 	m_billid = *str;
890     setChanParams(msg,true);
891     return true;
892 }
893 
callRouted(Message & msg)894 bool Channel::callRouted(Message& msg)
895 {
896     status("routed");
897     if (m_billid.null())
898 	m_billid = msg.getValue(YSTRING("billid"));
899     setChanParams(msg,true);
900     return true;
901 }
902 
callAccept(Message & msg)903 void Channel::callAccept(Message& msg)
904 {
905     status("accepted");
906     int defTout = m_driver ? m_driver->timeout() : -1;
907     if (defTout <= 0)
908 	defTout = -1;
909     setMaxcall(msg,defTout);
910     setChanParams(msg,true);
911     if (m_billid.null())
912 	m_billid = msg.getValue(YSTRING("billid"));
913     m_targetid = msg.getValue(YSTRING("targetid"));
914     String detect = msg.getValue(YSTRING("tonedetect_in"));
915     if (detect && detect.toBoolean(true)) {
916 	if (detect.toBoolean(false))
917 	    detect = "tone/*";
918 	toneDetect(detect);
919     }
920     if (msg.getBoolValue(YSTRING("autoanswer")))
921 	msgAnswered(msg);
922     else if (msg.getBoolValue(YSTRING("autoring")))
923 	msgRinging(msg);
924     else if (msg.getBoolValue(YSTRING("autoprogress")))
925 	msgProgress(msg);
926     else if (m_targetid.null() && msg.getBoolValue(YSTRING("autoanswer"),true)) {
927 	// no preference exists in the message so issue a notice
928 	TraceDebug(traceId(),this,DebugNote,"Answering now call %s because we have no targetid [%p]",
929 	    id().c_str(),this);
930 	msgAnswered(msg);
931     }
932 }
933 
callConnect(Message & msg)934 void Channel::callConnect(Message& msg)
935 {
936     String detect = msg.getValue(YSTRING("tonedetect_out"));
937     if (detect && detect.toBoolean(true)) {
938 	if (detect.toBoolean(false))
939 	    detect = "tone/*";
940 	toneDetect(detect);
941     }
942 }
943 
callRejected(const char * error,const char * reason,const Message * msg)944 void Channel::callRejected(const char* error, const char* reason, const Message* msg)
945 {
946     TraceDebug(traceId(),this,DebugMild,"Call rejected error='%s' reason='%s' [%p]",error,reason,this);
947     if (msg) {
948 	const String* cp = msg->getParam(s_copyParams);
949 	if (!TelEngine::null(cp)) {
950 	    s_paramMutex.lock();
951 	    parameters().copyParams(*msg,*cp);
952 	    s_paramMutex.unlock();
953 	}
954 	setChanParams(*msg,true);
955     }
956     status("rejected");
957 }
958 
dtmfSequence(Message & msg)959 bool Channel::dtmfSequence(Message& msg)
960 {
961     if ((msg != YSTRING("chan.dtmf")) || msg.getParam(YSTRING("sequence")))
962 	return false;
963     bool duplicate = false;
964     const String* detected = msg.getParam(YSTRING("detected"));
965     const String* text = msg.getParam(YSTRING("text"));
966     Lock lock(mutex());
967     unsigned int seq = m_dtmfSeq;
968     if (text && detected &&
969 	(*text == m_dtmfText) && (*detected != m_dtmfDetected) &&
970 	(msg.msgTime() < m_dtmfTime))
971 	duplicate = true;
972     else {
973 	seq = ++m_dtmfSeq;
974 	m_dtmfTime = msg.msgTime() + 4000000;
975 	m_dtmfText = text;
976 	m_dtmfDetected = detected;
977     }
978     // need to add sequence number used to detect reorders
979     msg.addParam("sequence",String(seq));
980     msg.addParam("duplicate",String::boolText(duplicate));
981     return duplicate;
982 }
983 
dtmfEnqueue(Message * msg)984 bool Channel::dtmfEnqueue(Message* msg)
985 {
986     if (!msg)
987 	return false;
988     if (dtmfSequence(*msg) && m_driver && !m_driver->m_dtmfDups) {
989 	TraceDebug(traceId(),this,DebugNote,"Dropping duplicate '%s' DTMF '%s' [%p]",
990 	    msg->getValue("detected"),msg->getValue("text"),this);
991 	TelEngine::destruct(msg);
992 	return false;
993     }
994     return Engine::enqueue(msg);
995 }
996 
dtmfInband(const char * tone)997 bool Channel::dtmfInband(const char* tone)
998 {
999     if (null(tone))
1000 	return false;
1001     Message m("chan.attach");
1002     complete(m,true);
1003     m.userData(this);
1004     String tmp("tone/dtmfstr/");
1005     tmp += tone;
1006     m.setParam("override",tmp);
1007     m.setParam("single","yes");
1008     return Engine::dispatch(m);
1009 }
1010 
toneDetect(const char * sniffer)1011 bool Channel::toneDetect(const char* sniffer)
1012 {
1013     if (null(sniffer))
1014 	sniffer = "tone/*";
1015     Message m("chan.attach");
1016     complete(m,true);
1017     m.userData(this);
1018     m.setParam("sniffer",sniffer);
1019     m.setParam("single","yes");
1020     return Engine::dispatch(m);
1021 }
1022 
setDebug(Message & msg)1023 bool Channel::setDebug(Message& msg)
1024 {
1025     String str = msg.getValue("line");
1026     if (str.startSkip("level")) {
1027 	int dbg = debugLevel();
1028 	str >> dbg;
1029 	if (str == "+") {
1030 	    if (debugLevel() > dbg)
1031 		dbg = debugLevel();
1032 	}
1033 	else if (str == "-") {
1034 	    if (debugLevel() < dbg)
1035 		dbg = debugLevel();
1036 	}
1037 	debugLevel(dbg);
1038     }
1039     else if (str == "reset")
1040 	debugChain(m_driver);
1041     else if (str == "engine")
1042 	debugCopy();
1043     else if (str.isBoolean())
1044 	debugEnabled(str.toBoolean(debugEnabled()));
1045     msg.retValue() << "Channel " << id()
1046 	<< " debug " << (debugEnabled() ? "on" : "off")
1047 	<< " level " << debugLevel() << (debugChained() ? " chained" : "") << "\r\n";
1048     return true;
1049 }
1050 
allocId()1051 unsigned int Channel::allocId()
1052 {
1053     s_callidMutex.lock();
1054     unsigned int id = ++s_callid;
1055     s_callidMutex.unlock();
1056     return id;
1057 }
1058 
1059 TokenDict Module::s_messages[] = {
1060     { "engine.status",   Module::Status },
1061     { "engine.timer",    Module::Timer },
1062     { "engine.debug",    Module::Level },
1063     { "engine.command",  Module::Command },
1064     { "engine.help",     Module::Help },
1065     { "engine.halt",     Module::Halt },
1066     { "engine.stop",     Module::Stop },
1067     { "call.route",      Module::Route },
1068     { "call.execute",    Module::Execute },
1069     { "call.drop",       Module::Drop },
1070     { "call.progress",   Module::Progress },
1071     { "call.ringing",    Module::Ringing },
1072     { "call.answered",   Module::Answered },
1073     { "call.update",     Module::Update },
1074     { "chan.dtmf",       Module::Tone },
1075     { "chan.text",       Module::Text },
1076     { "chan.masquerade", Module::Masquerade },
1077     { "chan.locate",     Module::Locate },
1078     { "chan.transfer",   Module::Transfer },
1079     { "chan.control",	 Module::Control },
1080     { "msg.execute",     Module::MsgExecute },
1081     { 0, 0 }
1082 };
1083 
1084 unsigned int Module::s_delay = 5;
1085 
messageName(int id)1086 const char* Module::messageName(int id)
1087 {
1088     if ((id <= 0) || (id >PubLast))
1089 	return 0;
1090     return lookup(id,s_messages);
1091 }
1092 
Module(const char * name,const char * type,bool earlyInit)1093 Module::Module(const char* name, const char* type, bool earlyInit)
1094     : Plugin(name,earlyInit), Mutex(true,"Module"),
1095       m_init(false), m_relays(0), m_type(type), m_changed(0)
1096 {
1097 }
1098 
~Module()1099 Module::~Module()
1100 {
1101 }
1102 
getObject(const String & name) const1103 void* Module::getObject(const String& name) const
1104 {
1105     if (name == YATOM("Module"))
1106 	return const_cast<Module*>(this);
1107     return Plugin::getObject(name);
1108 }
1109 
installRelay(int id,const char * name,unsigned priority)1110 bool Module::installRelay(int id, const char* name, unsigned priority)
1111 {
1112     if (!(id && name && priority))
1113 	return false;
1114 
1115     TempObjectCounter cnt(objectsCounter(),true);
1116     Lock lock(this);
1117     if (m_relays & id)
1118 	return true;
1119     m_relays |= id;
1120 
1121     MessageRelay* relay = new MessageRelay(name,this,id,priority,Module::name());
1122     m_relayList.append(relay)->setDelete(false);
1123     Engine::install(relay);
1124     return true;
1125 }
1126 
installRelay(int id,unsigned priority)1127 bool Module::installRelay(int id, unsigned priority)
1128 {
1129     return installRelay(id,messageName(id),priority);
1130 }
1131 
installRelay(const char * name,unsigned priority)1132 bool Module::installRelay(const char* name, unsigned priority)
1133 {
1134     return installRelay(lookup(name,s_messages),name,priority);
1135 }
1136 
installRelay(MessageRelay * relay)1137 bool Module::installRelay(MessageRelay* relay)
1138 {
1139     if (!relay || ((relay->id() & m_relays) != 0) || m_relayList.find(relay))
1140 	return false;
1141     m_relays |= relay->id();
1142     m_relayList.append(relay)->setDelete(false);
1143     Engine::install(relay);
1144     return true;
1145 }
1146 
uninstallRelay(MessageRelay * relay,bool delRelay)1147 bool Module::uninstallRelay(MessageRelay* relay, bool delRelay)
1148 {
1149     if (!relay || ((relay->id() & m_relays) == 0) || !m_relayList.remove(relay,false))
1150 	return false;
1151     Engine::uninstall(relay);
1152     m_relays &= ~relay->id();
1153     if (delRelay)
1154 	TelEngine::destruct(relay);
1155     return true;
1156 }
1157 
uninstallRelay(int id,bool delRelay)1158 bool Module::uninstallRelay(int id, bool delRelay)
1159 {
1160     if ((id & m_relays) == 0)
1161 	return false;
1162     for (ObjList* l = m_relayList.skipNull(); l; l = l->skipNext()) {
1163 	MessageRelay* r = static_cast<MessageRelay*>(l->get());
1164 	if (r->id() != id)
1165 	    continue;
1166 	Engine::uninstall(r);
1167 	m_relays &= ~id;
1168 	l->remove(delRelay);
1169 	break;
1170     }
1171     return false;
1172 }
1173 
1174 
uninstallRelays()1175 bool Module::uninstallRelays()
1176 {
1177     while (MessageRelay* relay = static_cast<MessageRelay*>(m_relayList.remove(false))) {
1178 	Engine::uninstall(relay);
1179 	m_relays &= ~relay->id();
1180 	relay->destruct();
1181     }
1182     return (0 == m_relays) && (0 == m_relayList.count());
1183 }
1184 
initialize()1185 void Module::initialize()
1186 {
1187     setup();
1188 }
1189 
setup()1190 void Module::setup()
1191 {
1192     DDebug(this,DebugAll,"Module::setup()");
1193     if (m_init)
1194 	return;
1195     m_init = true;
1196     installRelay(Timer,90);
1197     installRelay(Status,110);
1198     installRelay(Level,120);
1199     installRelay(Command,120);
1200 }
1201 
changed()1202 void Module::changed()
1203 {
1204     if (s_delay && !m_changed)
1205 	m_changed = Time::now() + s_delay*(u_int64_t)1000000;
1206 }
1207 
msgTimer(Message & msg)1208 void Module::msgTimer(Message& msg)
1209 {
1210     if (m_changed && (msg.msgTime() > m_changed)) {
1211 	Message* m = new Message("module.update");
1212 	m->addParam("module",name());
1213 	m_changed = 0;
1214 	genUpdate(*m);
1215 	Engine::enqueue(m);
1216     }
1217 }
1218 
msgRoute(Message & msg)1219 bool Module::msgRoute(Message& msg)
1220 {
1221     return false;
1222 }
1223 
msgCommand(Message & msg)1224 bool Module::msgCommand(Message& msg)
1225 {
1226     const NamedString* line = msg.getParam(YSTRING("line"));
1227     if (line)
1228 	return commandExecute(msg.retValue(),*line);
1229     if (msg.getParam(YSTRING("partline")) || msg.getParam(YSTRING("partword")))
1230 	return commandComplete(msg,msg.getValue(YSTRING("partline")),msg.getValue(YSTRING("partword")));
1231     return false;
1232 }
1233 
commandExecute(String & retVal,const String & line)1234 bool Module::commandExecute(String& retVal, const String& line)
1235 {
1236     return false;
1237 }
1238 
commandComplete(Message & msg,const String & partLine,const String & partWord)1239 bool Module::commandComplete(Message& msg, const String& partLine, const String& partWord)
1240 {
1241     if ((partLine == YSTRING("debug")) || (partLine == YSTRING("status")))
1242 	itemComplete(msg.retValue(),name(),partWord);
1243     return false;
1244 }
1245 
itemComplete(String & itemList,const String & item,const String & partWord)1246 bool Module::itemComplete(String& itemList, const String& item, const String& partWord)
1247 {
1248     if (partWord.null() || item.startsWith(partWord)) {
1249 	itemList.append(item,"\t");
1250 	return true;
1251     }
1252     return false;
1253 }
1254 
msgStatus(Message & msg)1255 void Module::msgStatus(Message& msg)
1256 {
1257     String mod, par, det;
1258     bool details = msg.getBoolValue(YSTRING("details"),true);
1259     lock();
1260     statusModule(mod);
1261     statusParams(par);
1262     if (details)
1263 	statusDetail(det);
1264     unlock();
1265     msg.retValue() << mod << ";" << par;
1266     if (det)
1267 	msg.retValue() << ";" << det;
1268     msg.retValue() << "\r\n";
1269 }
1270 
statusModule(String & str)1271 void Module::statusModule(String& str)
1272 {
1273     str.append("name=",",") << name();
1274     if (m_type)
1275 	str << ",type=" << m_type;
1276 }
1277 
statusParams(String & str)1278 void Module::statusParams(String& str)
1279 {
1280 }
1281 
statusDetail(String & str)1282 void Module::statusDetail(String& str)
1283 {
1284 }
1285 
genUpdate(Message & msg)1286 void Module::genUpdate(Message& msg)
1287 {
1288 }
1289 
received(Message & msg,int id)1290 bool Module::received(Message &msg, int id)
1291 {
1292     if (name().null())
1293 	return false;
1294 
1295     switch (id) {
1296 	case Timer:
1297 	    lock();
1298 	    msgTimer(msg);
1299 	    unlock();
1300 	    return false;
1301 	case Route:
1302 	    return msgRoute(msg);
1303     }
1304 
1305     String dest = msg.getValue(YSTRING("module"));
1306 
1307     if (id == Status) {
1308 	if (dest == name()) {
1309 	    msgStatus(msg);
1310 	    return true;
1311 	}
1312 	if (dest.null() || (dest == m_type))
1313 	    msgStatus(msg);
1314 	return false;
1315     }
1316     else if (id == Level)
1317 	return setDebug(msg,dest);
1318     else if (id == Command)
1319 	return msgCommand(msg);
1320 
1321     return false;
1322 }
1323 
setDebug(Message & msg,const String & target)1324 bool Module::setDebug(Message& msg, const String& target)
1325 {
1326     if (target != name())
1327 	return false;
1328 
1329     NamedCounter* counter = objectsCounter();
1330     String str = msg.getValue("line");
1331     if (str.startSkip("level")) {
1332 	int dbg = debugLevel();
1333 	str >> dbg;
1334 	if (str == "+") {
1335 	    if (debugLevel() > dbg)
1336 		dbg = debugLevel();
1337 	}
1338 	else if (str == "-") {
1339 	    if (debugLevel() < dbg)
1340 		dbg = debugLevel();
1341 	}
1342 	debugLevel(dbg);
1343     }
1344     else if (str == "reset") {
1345 	debugLevel(TelEngine::debugLevel());
1346 	debugEnabled(true);
1347 	if (counter)
1348 	    counter->enable(getObjCounting());
1349     }
1350     else if (str.startSkip("objects")) {
1351 	bool dbg = (str == "reset") ? getObjCounting() : (counter && counter->enabled());
1352 	str >> dbg;
1353 	if (counter)
1354 	    counter->enable(dbg);
1355     }
1356     else if (str.startSkip("filter"))
1357 	m_filter = str;
1358     else {
1359 	bool dbg = debugEnabled();
1360 	str >> dbg;
1361 	debugEnabled(dbg);
1362     }
1363     msg.retValue() << "Module " << name()
1364 	<< " debug " << (debugEnabled() ? "on" : "off")
1365 	<< " level " << debugLevel()
1366 	<< " objects " << ((counter && counter->enabled()) ? "on" : "off");
1367     if (m_filter)
1368 	msg.retValue() << " filter: " << m_filter;
1369     msg.retValue() << "\r\n";
1370     return true;
1371 }
1372 
filterDebug(const String & item) const1373 bool Module::filterDebug(const String& item) const
1374 {
1375     return m_filter.null() ? debugEnabled() : m_filter.matches(item);
1376 }
1377 
1378 
Driver(const char * name,const char * type)1379 Driver::Driver(const char* name, const char* type)
1380     : Module(name,type),
1381       m_init(false), m_varchan(true),
1382       m_routing(0), m_routed(0), m_total(0),
1383       m_nextid(0), m_timeout(0),
1384       m_maxroute(0), m_maxchans(0), m_chanCount(0),
1385       m_dtmfDups(false), m_doExpire(true)
1386 {
1387     m_prefix << name << "/";
1388 }
1389 
getObject(const String & name) const1390 void* Driver::getObject(const String& name) const
1391 {
1392     if (name == YATOM("Driver"))
1393 	return const_cast<Driver*>(this);
1394     return Module::getObject(name);
1395 }
1396 
initialize()1397 void Driver::initialize()
1398 {
1399     setup();
1400 }
1401 
setup(const char * prefix,bool minimal)1402 void Driver::setup(const char* prefix, bool minimal)
1403 {
1404     DDebug(this,DebugAll,"Driver::setup('%s',%d)",prefix,minimal);
1405     Module::setup();
1406     loadLimits();
1407     if (m_init)
1408 	return;
1409     m_init = true;
1410     m_prefix = prefix ? prefix : name().c_str();
1411     if (m_prefix && !m_prefix.endsWith("/"))
1412 	m_prefix += "/";
1413     XDebug(DebugAll,"setup name='%s' prefix='%s'",name().c_str(),m_prefix.c_str());
1414     installRelay(Masquerade,10);
1415     installRelay(Locate,40);
1416     installRelay(Drop,60);
1417     installRelay(Execute,90);
1418     installRelay(Control,90);
1419     if (minimal)
1420 	return;
1421     installRelay(Tone);
1422     installRelay(Text);
1423     installRelay(Ringing);
1424     installRelay(Answered);
1425 }
1426 
isBusy() const1427 bool Driver::isBusy() const
1428 {
1429     return (m_routing || m_chanCount);
1430 }
1431 
find(const String & id) const1432 Channel* Driver::find(const String& id) const
1433 {
1434     const ObjList* pos = m_chans.find(id);
1435     return pos ? static_cast<Channel*>(pos->get()) : 0;
1436 }
1437 
received(Message & msg,int id)1438 bool Driver::received(Message &msg, int id)
1439 {
1440     if (!m_prefix)
1441 	return false;
1442     // pick destination depending on message type
1443     String dest;
1444     switch (id) {
1445 	case Timer:
1446 	    if (m_doExpire && lock(950000)) {
1447 		if (m_doExpire) {
1448 		    m_doExpire = false;
1449 		    // check each channel for timeouts
1450 		    ListIterator iter(m_chans);
1451 		    Time t;
1452 		    for (;;) {
1453 			RefPointer<Channel> c = static_cast<Channel*>(iter.get());
1454 			unlock();
1455 			if (!c)
1456 			    break;
1457 			c->checkTimers(msg,t);
1458 			c = 0;
1459 			lock();
1460 		    }
1461 		    m_doExpire = true;
1462 		}
1463 		else
1464 		    unlock();
1465 	    }
1466 	    return Module::received(msg,id);
1467 	case Status:
1468 	    // check if it's a channel status request
1469 	    dest = msg.getValue(YSTRING("module"));
1470 	    if (dest.startsWith(m_prefix))
1471 		break;
1472 	    // fall through
1473 	case Level:
1474 	case Route:
1475 	case Command:
1476 	    return Module::received(msg,id);
1477 	case Halt:
1478 	    dropAll(msg);
1479 	    return false;
1480 	case Execute:
1481 	    dest = msg.getValue(YSTRING("callto"));
1482 	    break;
1483 	case Drop:
1484 	case Masquerade:
1485 	case Locate:
1486 	    dest = msg.getValue(YSTRING("id"));
1487 	    break;
1488 	default:
1489 	    dest = msg.getValue(YSTRING("peerid"));
1490 	    // if this channel is not the peer, try to match it as target
1491 	    if (!dest.startsWith(m_prefix))
1492 		dest = msg.getValue(YSTRING("targetid"));
1493 	    break;
1494     }
1495     XDebug(DebugAll,"id=%d prefix='%s' dest='%s'",id,m_prefix.c_str(),dest.c_str());
1496 
1497     if (id == Drop) {
1498 	bool exact = (dest == name());
1499 	if (exact || dest.null() || (dest == type())) {
1500 	    dropAll(msg);
1501 	    return exact;
1502 	}
1503     }
1504 
1505     // handle call.execute which should start a new channel
1506     if (id == Execute) {
1507 	if (!canAccept(false))
1508 	    return false;
1509 	if (dest.startSkip(m_prefix,false) ||
1510 	    (dest.startSkip("line/",false) && hasLine(msg.getValue(YSTRING("line"))))) {
1511 	    if (msg.getBoolValue(YSTRING("stop_call"),false) && !canStopCall()) {
1512 		msg.setParam(YSTRING("error"),"stopped_call");
1513 		return false;
1514 	    }
1515 	    return msgExecute(msg,dest);
1516 	}
1517 	return false;
1518     }
1519 
1520     // check if the message was for this driver
1521     if (!dest.startsWith(m_prefix))
1522 	return false;
1523 
1524     lock();
1525     RefPointer<Channel> chan = find(dest);
1526     unlock();
1527     if (!chan) {
1528 	DDebug(this,DebugMild,"Could not find channel '%s'",dest.c_str());
1529 	return false;
1530     }
1531 
1532     switch (id) {
1533 	case Status:
1534 	    chan->msgStatus(msg);
1535 	    return true;
1536 	case Progress:
1537 	    return chan->isIncoming() && !chan->isAnswered() && chan->msgProgress(msg);
1538 	case Ringing:
1539 	    return chan->isIncoming() && !chan->isAnswered() && chan->msgRinging(msg);
1540 	case Answered:
1541 	    return chan->isIncoming() && !chan->isAnswered() && chan->msgAnswered(msg);
1542 	case Tone:
1543 	    return chan->msgTone(msg,msg.getValue("text"));
1544 	case Text:
1545 	    return chan->msgText(msg,msg.getValue("text"));
1546 	case Drop:
1547 	    return chan->msgDrop(msg,msg.getValue("reason"));
1548 	case Transfer:
1549 	    return chan->msgTransfer(msg);
1550 	case Update:
1551 	    return chan->msgUpdate(msg);
1552 	case Masquerade:
1553 	    msg = msg.getValue(YSTRING("message"));
1554 	    msg.clearParam(YSTRING("message"));
1555 	    msg.userData(chan);
1556 	    if (chan->msgMasquerade(msg))
1557 		return true;
1558 	    chan->complete(msg,msg.getBoolValue(YSTRING("complete_minimal"),false));
1559 	    return false;
1560 	case Locate:
1561 	    msg.userData(chan);
1562 	    return true;
1563 	case Control:
1564 	    return chan->msgControl(msg);
1565     }
1566     return false;
1567 }
1568 
dropAll(Message & msg)1569 void Driver::dropAll(Message &msg)
1570 {
1571     const char* reason = msg.getValue(YSTRING("reason"));
1572     lock();
1573     ListIterator iter(m_chans);
1574     for (;;) {
1575 	RefPointer<Channel> c = static_cast<Channel*>(iter.get());
1576 	unlock();
1577 	if (!c)
1578 	    break;
1579 	DDebug(this,DebugAll,"Dropping %s channel '%s' @%p [%p]",
1580 	    name().c_str(),c->id().c_str(),static_cast<Channel*>(c),this);
1581 	c->msgDrop(msg,reason);
1582 	c = 0;
1583 	lock();
1584     }
1585 }
1586 
canAccept(bool routers)1587 bool Driver::canAccept(bool routers)
1588 {
1589     if (Engine::exiting())
1590 	return false;
1591     if (routers && !canRoute())
1592 	return false;
1593     if (m_maxchans)
1594 	return (m_chanCount < m_maxchans);
1595     return true;
1596 }
1597 
canRoute()1598 bool Driver::canRoute()
1599 {
1600     if (Engine::exiting() || (Engine::accept() >= Engine::Congestion))
1601 	return false;
1602     if (m_maxroute && (m_routing >= m_maxroute))
1603 	return false;
1604     return true;
1605 }
1606 
hasLine(const String & line) const1607 bool Driver::hasLine(const String& line) const
1608 {
1609     return false;
1610 }
1611 
msgRoute(Message & msg)1612 bool Driver::msgRoute(Message& msg)
1613 {
1614     String called = msg.getValue(YSTRING("called"));
1615     if (called.null())
1616 	return false;
1617     String line = msg.getValue(YSTRING("line"));
1618     if (line.null())
1619 	line = msg.getValue(YSTRING("account"));
1620     if (line && hasLine(line)) {
1621 	// asked to route to a line we have locally
1622 	msg.setParam("line",line);
1623 	msg.retValue() = prefix() + called;
1624 	return true;
1625     }
1626     return Module::msgRoute(msg);
1627 }
1628 
genUpdate(Message & msg)1629 void Driver::genUpdate(Message& msg)
1630 {
1631     msg.addParam("routed",String(m_routed));
1632     msg.addParam("routing",String(m_routing));
1633     msg.addParam("total",String(m_total));
1634     msg.addParam("chans",String(m_chanCount));
1635 }
1636 
statusModule(String & str)1637 void Driver::statusModule(String& str)
1638 {
1639     Module::statusModule(str);
1640     str.append("format=Status|Address|Peer",",");
1641 }
1642 
statusParams(String & str)1643 void Driver::statusParams(String& str)
1644 {
1645     Module::statusParams(str);
1646     str.append("routed=",",") << m_routed;
1647     str << ",routing=" << m_routing;
1648     str << ",total=" << m_total;
1649     str << ",chans=" << m_chanCount;
1650 }
1651 
statusDetail(String & str)1652 void Driver::statusDetail(String& str)
1653 {
1654     ObjList* l = m_chans.skipNull();
1655     for (; l; l=l->skipNext()) {
1656 	Channel* c = static_cast<Channel*>(l->get());
1657 	str.append(c->id(),",") << "=";
1658 	c->getStatus(str);
1659 	str << "|" << String::uriEscape(c->address(),",;|"," +?&")
1660 	    << "|" << c->getPeerId();
1661     }
1662 }
1663 
commandComplete(Message & msg,const String & partLine,const String & partWord)1664 bool Driver::commandComplete(Message& msg, const String& partLine, const String& partWord)
1665 {
1666     bool ok = false;
1667     bool listChans = String(msg.getValue(YSTRING("complete"))) == YSTRING("channels");
1668     if (listChans && (partWord.null() || name().startsWith(partWord)))
1669 	msg.retValue().append(name(),"\t");
1670     else
1671 	ok = Module::commandComplete(msg,partLine,partWord);
1672     lock();
1673     unsigned int nchans = m_chans.count();
1674     unlock();
1675     if (nchans && listChans) {
1676 	if (name().startsWith(partWord)) {
1677 	    msg.retValue().append(prefix(),"\t");
1678 	    return ok;
1679 	}
1680 	if (partWord.startsWith(prefix()))
1681 	    ok = true;
1682 	lock();
1683 	ObjList* l = m_chans.skipNull();
1684 	for (; l; l=l->skipNext()) {
1685 	    Channel* c = static_cast<Channel*>(l->get());
1686 	    if (c->id().startsWith(partWord))
1687 		msg.retValue().append(c->id(),"\t");
1688 	}
1689 	unlock();
1690     }
1691     return ok;
1692 }
1693 
setDebug(Message & msg,const String & target)1694 bool Driver::setDebug(Message& msg, const String& target)
1695 {
1696     if (!target.startsWith(m_prefix))
1697 	return Module::setDebug(msg,target);
1698 
1699     Lock lock(this);
1700     Channel* chan = find(target);
1701     if (chan)
1702 	return chan->setDebug(msg);
1703 
1704     return false;
1705 }
1706 
loadLimits()1707 void Driver::loadLimits()
1708 {
1709     timeout(Engine::config().getIntValue(YSTRING("telephony"),"timeout"));
1710     maxRoute(Engine::config().getIntValue(YSTRING("telephony"),"maxroute"));
1711     maxChans(Engine::config().getIntValue(YSTRING("telephony"),"maxchans"));
1712     dtmfDups(Engine::config().getBoolValue(YSTRING("telephony"),"dtmfdups"));
1713 }
1714 
nextid()1715 unsigned int Driver::nextid()
1716 {
1717     Lock lock(this);
1718     return ++m_nextid;
1719 }
1720 
1721 
Router(Driver * driver,const char * id,Message * msg)1722 Router::Router(Driver* driver, const char* id, Message* msg)
1723     : Thread("Call Router"), m_driver(driver), m_id(id), m_msg(msg)
1724 {
1725     if (driver)
1726 	setObjCounter(driver->objectsCounter());
1727 }
1728 
run()1729 void Router::run()
1730 {
1731     if (!(m_driver && m_msg))
1732 	return;
1733     m_driver->lock();
1734     m_driver->m_routing++;
1735     m_driver->changed();
1736     m_driver->unlock();
1737     bool ok = route();
1738     m_driver->lock();
1739     m_driver->m_routing--;
1740     if (ok)
1741 	m_driver->m_routed++;
1742     m_driver->changed();
1743     m_driver->unlock();
1744 }
1745 
route()1746 bool Router::route()
1747 {
1748     DDebug(m_driver,DebugAll,"Routing thread for '%s' [%p]",m_id.c_str(),this);
1749 
1750     RefPointer<Channel> chan;
1751     String tmp(m_msg->getValue(YSTRING("callto")));
1752     bool ok = !tmp.null();
1753     if (ok)
1754 	m_msg->retValue() = tmp;
1755     else {
1756 	if (*m_msg == YSTRING("call.preroute")) {
1757 	    ok = Engine::dispatch(m_msg);
1758 	    m_driver->lock();
1759 	    chan = m_driver->find(m_id);
1760 	    m_driver->unlock();
1761 	    if (!chan) {
1762 		Debug(m_driver,DebugInfo,"Connection '%s' vanished while prerouting!",m_id.c_str());
1763 		return false;
1764 	    }
1765 	    const String* cp = m_msg->getParam(s_copyParams);
1766 	    if (!TelEngine::null(cp)) {
1767 		Channel::paramMutex().lock();
1768 		chan->parameters().copyParams(*m_msg,*cp);
1769 		Channel::paramMutex().unlock();
1770 	    }
1771 	    bool dropCall = ok && ((m_msg->retValue() == YSTRING("-")) || (m_msg->retValue() == YSTRING("error")));
1772 	    if (dropCall)
1773 		chan->callRejected(m_msg->getValue(YSTRING("error"),"unknown"),
1774 		    m_msg->getValue(YSTRING("reason")),m_msg);
1775 	    else
1776 		dropCall = !chan->callPrerouted(*m_msg,ok);
1777 	    if (dropCall) {
1778 		// get rid of the dynamic chans
1779 		if (m_driver->varchan())
1780 		    chan->deref();
1781 		return false;
1782 	    }
1783 	    chan = 0;
1784 	    *m_msg = "call.route";
1785 	    m_msg->retValue().clear();
1786 	    if (Engine::trackParam())
1787 		m_msg->clearParam(Engine::trackParam());
1788 	    m_msg->msgTime() = Time::now();
1789 	}
1790 	ok = Engine::dispatch(m_msg);
1791     }
1792 
1793     m_driver->lock();
1794     chan = m_driver->find(m_id);
1795     m_driver->unlock();
1796 
1797     if (!chan) {
1798 	Debug(m_driver,DebugInfo,"Connection '%s' vanished while routing!",m_id.c_str());
1799 	return false;
1800     }
1801     // chan will keep it referenced even if message user data is changed
1802     m_msg->userData(chan);
1803 
1804     static const char s_noroute[] = "noroute";
1805     static const char s_looping[] = "looping";
1806     static const char s_noconn[] = "noconn";
1807 
1808     if (ok && m_msg->retValue().trimSpaces()) {
1809 	if ((m_msg->retValue() == YSTRING("-")) || (m_msg->retValue() == YSTRING("error")))
1810 	    chan->callRejected(m_msg->getValue(YSTRING("error"),"unknown"),
1811 		m_msg->getValue("reason"),m_msg);
1812 	else if (m_msg->getIntValue(YSTRING("antiloop"),1) <= 0) {
1813 	    const char* error = m_msg->getValue(YSTRING("error"),s_looping);
1814 	    chan->callRejected(error,m_msg->getValue(YSTRING("reason"),
1815 		((s_looping == error) ? "Call is looping" : (const char*)0)),m_msg);
1816 	}
1817 	else if (chan->callRouted(*m_msg)) {
1818 	    *m_msg = "call.execute";
1819 	    m_msg->setParam("callto",m_msg->retValue());
1820 	    m_msg->clearParam(YSTRING("error"));
1821 	    m_msg->retValue().clear();
1822 	    if (Engine::trackParam())
1823 		m_msg->clearParam(Engine::trackParam());
1824 	    m_msg->msgTime() = Time::now();
1825 	    ok = Engine::dispatch(m_msg);
1826 	    if (ok)
1827 		chan->callAccept(*m_msg);
1828 	    else {
1829 		const char* error = m_msg->getValue(YSTRING("error"),s_noconn);
1830 		const char* reason = m_msg->getValue(YSTRING("reason"),
1831 		    ((s_noconn == error) ? "Could not connect to target" : (const char*)0));
1832 		Message m(s_disconnected);
1833 		const String* cp = m_msg->getParam(s_copyParams);
1834 		if (!TelEngine::null(cp))
1835 		    m.copyParams(*m_msg,*cp);
1836 		chan->complete(m);
1837 		m.setParam("error",error);
1838 		m.setParam("reason",reason);
1839 		m.setParam("reroute",String::boolText(true));
1840 		m.userData(chan);
1841 		m.setNotify();
1842 		if (!Engine::dispatch(m))
1843 		    chan->callRejected(error,reason,m_msg);
1844 	    }
1845 	}
1846     }
1847     else {
1848 	const char* error = m_msg->getValue(YSTRING("error"),s_noroute);
1849 	chan->callRejected(error,m_msg->getValue(YSTRING("reason"),
1850 	    ((s_noroute == error) ? "No route to call target" : (const char*)0)),m_msg);
1851     }
1852 
1853     // dereference again if the channel is dynamic
1854     if (m_driver->varchan())
1855 	chan->deref();
1856     return ok;
1857 }
1858 
cleanup()1859 void Router::cleanup()
1860 {
1861     destruct(m_msg);
1862 }
1863 
1864 
pickAccountParams(const NamedList & params)1865 void CallAccount::pickAccountParams(const NamedList& params)
1866 {
1867     NamedIterator iter(params);
1868     Lock mylock(m_mutex);
1869     m_inbParams.clearParams();
1870     m_outParams.clearParams();
1871     m_regParams.clearParams();
1872     while (const NamedString* n = iter.get()) {
1873 	if (n->name().length() <= 4)
1874 	    continue;
1875 	String name = n->name().substr(4).trimSpaces();
1876 	if (n->name().startsWith("reg:"))
1877 	    m_regParams.setParam(name,*n);
1878 	else if (n->name().startsWith("inb:"))
1879 	    m_inbParams.setParam(name,*n);
1880 	else if (n->name().startsWith("out:"))
1881 	    m_outParams.setParam(name,*n);
1882     }
1883 }
1884 
setInboundParams(NamedList & params)1885 void CallAccount::setInboundParams(NamedList& params)
1886 {
1887     Lock mylock(m_mutex);
1888     NamedIterator iter(m_inbParams);
1889     while (const NamedString* n = iter.get()) {
1890 	String tmp(*n);
1891 	params.replaceParams(tmp);
1892 	params.setParam(n->name(),tmp);
1893     }
1894 }
1895 
setOutboundParams(NamedList & params)1896 void CallAccount::setOutboundParams(NamedList& params)
1897 {
1898     Lock mylock(m_mutex);
1899     NamedIterator iter(m_outParams);
1900     while (const NamedString* n = iter.get()) {
1901 	String tmp(*n);
1902 	params.replaceParams(tmp);
1903 	params.setParam(n->name(),tmp);
1904     }
1905 }
1906 
setRegisterParams(NamedList & params)1907 void CallAccount::setRegisterParams(NamedList& params)
1908 {
1909     Lock mylock(m_mutex);
1910     NamedIterator iter(m_regParams);
1911     while (const NamedString* n = iter.get()) {
1912 	String tmp(*n);
1913 	params.replaceParams(tmp);
1914 	params.setParam(n->name(),tmp);
1915     }
1916 }
1917 
1918 /* vi: set ts=8 sw=4 sts=4 noet: */
1919