1 /**
2 * Channel.cpp
3 * This file is part of the YATE Project http://YATE.null.ro
4 *
5 * Yet Another Telephony Engine - a fully featured software PBX and IVR
6 * Copyright (C) 2004-2014 Null Team
7 *
8 * This software is distributed under multiple licenses;
9 * see the COPYING file in the main directory for licensing
10 * information for this specific distribution.
11 *
12 * This use of this software may be subject to additional restrictions.
13 * See the LEGAL file in the main directory for details.
14 *
15 * This program is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
18 */
19
20 #include "yatephone.h"
21
22 #include <string.h>
23 #include <stdlib.h>
24
25 using namespace TelEngine;
26
27 // Find if a string appears to be an E164 phone number
isE164(const char * str)28 bool TelEngine::isE164(const char* str)
29 {
30 if (!str)
31 return false;
32 // an initial + character is ok, we skip it
33 if (*str == '+')
34 str++;
35 // at least one valid character is required
36 if (!*str)
37 return false;
38 for (;;) {
39 switch (*str++) {
40 case '0':
41 case '1':
42 case '2':
43 case '3':
44 case '4':
45 case '5':
46 case '6':
47 case '7':
48 case '8':
49 case '9':
50 case '*':
51 case '#':
52 break;
53 case '\0':
54 return true;
55 default:
56 return false;
57 }
58 }
59 }
60
61 static unsigned int s_callid = 0;
62 static Mutex s_callidMutex(false,"CallID");
63
64 // this is to protect against two threads trying to (dis)connect a pair
65 // of call endpoints at the same time
66 static Mutex s_mutex(true,"CallEndpoint");
67 static Mutex s_lastMutex(false,"CallEndpoint::last");
68 static const String s_audioType = "audio";
69 static const String s_copyParams = "copyparams";
70
71 // Check if a Lock taken on the common mutex succeeded, wait up to 55s more in congestion
checkRetry(Lock & lock)72 static bool checkRetry(Lock& lock)
73 {
74 if (lock.locked())
75 return true;
76 Engine::setCongestion("Call endpoint mutex busy");
77 bool ok = lock.acquire(s_mutex,55000000);
78 Engine::setCongestion();
79 return ok;
80 }
81
82
CallEndpoint(const char * id)83 CallEndpoint::CallEndpoint(const char* id)
84 : m_peer(0), m_lastPeer(0), m_id(id), m_mutex(0)
85 {
86 }
87
destroyed()88 void CallEndpoint::destroyed()
89 {
90 #ifdef DEBUG
91 ObjList* l = m_data.skipNull();
92 for (; l; l=l->skipNext()) {
93 DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
94 Debug(DebugAll,"Endpoint at %p type '%s' refcount=%d",e,e->name().c_str(),e->refcount());
95 }
96 #endif
97 disconnect(true,0,true,0);
98 clearEndpoint();
99 m_lastPeer = 0;
100 }
101
commonMutex()102 Mutex& CallEndpoint::commonMutex()
103 {
104 return s_mutex;
105 }
106
getObject(const String & name) const107 void* CallEndpoint::getObject(const String& name) const
108 {
109 if (name == YATOM("CallEndpoint"))
110 return const_cast<CallEndpoint*>(this);
111 return RefObject::getObject(name);
112 }
113
setId(const char * newId)114 void CallEndpoint::setId(const char* newId)
115 {
116 m_id = newId;
117 }
118
connect(CallEndpoint * peer,const char * reason,bool notify)119 bool CallEndpoint::connect(CallEndpoint* peer, const char* reason, bool notify)
120 {
121 if (!peer) {
122 disconnect(reason,notify);
123 return false;
124 }
125 if (peer == m_peer)
126 return true;
127 if (peer == this) {
128 TraceDebug(traceId(),DebugWarn,"CallEndpoint '%s' trying to connect to itself! [%p]",m_id.c_str(),this);
129 return false;
130 }
131 DDebug(DebugAll,"CallEndpoint '%s' connecting peer %p to [%p]",m_id.c_str(),peer,this);
132
133 #if 0
134 Lock lock(s_mutex,5000000);
135 if (!checkRetry(lock)) {
136 Alarm("engine","bug",DebugFail,"Call connect failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
137 Engine::restart(0);
138 return false;
139 }
140 #endif
141
142 // are we already dead?
143 if (!ref())
144 return false;
145 disconnect(reason,notify);
146 // is our intended peer dead?
147 if (!peer->ref()) {
148 deref();
149 return false;
150 }
151 peer->disconnect(reason,notify);
152
153 ObjList* l = m_data.skipNull();
154 for (; l; l=l->skipNext()) {
155 DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
156 e->connect(peer->getEndpoint(e->name()));
157 }
158
159 m_peer = peer;
160 peer->setPeer(this,reason,notify);
161 setDisconnect(0);
162 connected(reason);
163
164 return true;
165 }
166
disconnect(bool final,const char * reason,bool notify,const NamedList * params)167 bool CallEndpoint::disconnect(bool final, const char* reason, bool notify, const NamedList* params)
168 {
169 if (!m_peer)
170 return false;
171 DDebug(DebugAll,"CallEndpoint '%s' disconnecting peer %p from [%p]",m_id.c_str(),m_peer,this);
172
173 Lock lock(s_mutex,5000000);
174 if (!checkRetry(lock)) {
175 TraceAlarm(traceId(),"engine","bug",DebugFail,"Call disconnect failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
176 Engine::restart(0);
177 return false;
178 }
179
180 CallEndpoint *temp = m_peer;
181 m_peer = 0;
182 m_lastPeer = 0;
183 if (!temp)
184 return false;
185
186 ObjList* l = m_data.skipNull();
187 for (; l; l=l->skipNext()) {
188 DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
189 DDebug(DebugAll,"Endpoint at %p type '%s' peer %p",e,e->name().c_str(),e->getPeer());
190 e->disconnect();
191 }
192
193 temp->setPeer(0,reason,notify,params);
194 bool dead = !alive();
195 if (dead)
196 TraceDebug(traceId(),DebugMild,"CallEndpoint '%s' disconnect called while dead [%p]",m_id.c_str(),this);
197 if (final)
198 disconnected(true,reason);
199 lock.drop();
200 temp->deref();
201 return dead || deref();
202 }
203
setPeer(CallEndpoint * peer,const char * reason,bool notify,const NamedList * params)204 void CallEndpoint::setPeer(CallEndpoint* peer, const char* reason, bool notify, const NamedList* params)
205 {
206 m_peer = peer;
207 if (m_peer) {
208 setDisconnect(0);
209 connected(reason);
210 }
211 else {
212 m_lastPeer = 0;
213 if (notify) {
214 setDisconnect(params);
215 disconnected(false,reason);
216 }
217 }
218 }
219
getPeerId(String & id) const220 bool CallEndpoint::getPeerId(String& id) const
221 {
222 id.clear();
223 if (!m_peer)
224 return false;
225 if (m_peer == m_lastPeer) {
226 Lock mylock(s_lastMutex);
227 if (m_peer == m_lastPeer) {
228 id = m_lastPeerId;
229 return !id.null();
230 }
231 }
232 Lock lock(s_mutex,5000000);
233 if (!checkRetry(lock)) {
234 TraceAlarm(traceId(),"engine","bug",DebugFail,"Peer ID failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
235 Engine::restart(0);
236 return false;
237 }
238 if (m_peer) {
239 id = m_peer->id();
240 return true;
241 }
242 else
243 return false;
244 }
245
getPeerId() const246 String CallEndpoint::getPeerId() const
247 {
248 String id;
249 getPeerId(id);
250 return id;
251 }
252
getLastPeerId(String & id) const253 bool CallEndpoint::getLastPeerId(String& id) const
254 {
255 id.clear();
256 if (m_lastPeerId.null())
257 return false;
258 s_lastMutex.lock();
259 id = m_lastPeerId;
260 s_lastMutex.unlock();
261 return !id.null();
262 }
263
setLastPeerId()264 void CallEndpoint::setLastPeerId()
265 {
266 if (!m_peer)
267 return;
268 if (m_peer == m_lastPeer)
269 return;
270 Lock lock(s_mutex,5000000);
271 if (!checkRetry(lock)) {
272 TraceAlarm(traceId(),"engine","bug",DebugCrit,"Set last peer ID failed - timeout on call endpoint mutex owned by '%s'!",s_mutex.owner());
273 return;
274 }
275 if (m_peer) {
276 s_lastMutex.lock();
277 m_lastPeer = m_peer;
278 m_lastPeerId = m_peer->id();
279 s_lastMutex.unlock();
280 }
281 }
282
getEndpoint(const String & type) const283 DataEndpoint* CallEndpoint::getEndpoint(const String& type) const
284 {
285 if (type.null())
286 return 0;
287 const ObjList* pos = m_data.find(type);
288 return pos ? static_cast<DataEndpoint*>(pos->get()) : 0;
289 }
290
setEndpoint(const String & type)291 DataEndpoint* CallEndpoint::setEndpoint(const String& type)
292 {
293 if (type.null() || (refcount() <= 0))
294 return 0;
295 DataEndpoint* dat = getEndpoint(type);
296 if (!dat) {
297 dat = new DataEndpoint(this,type);
298 if (m_peer)
299 dat->connect(m_peer->getEndpoint(type));
300 }
301 return dat;
302 }
303
setEndpoint(DataEndpoint * endPoint)304 void CallEndpoint::setEndpoint(DataEndpoint* endPoint)
305 {
306 if ((refcount() <= 0) || !(endPoint && endPoint->ref()))
307 return;
308 if (m_data.find(endPoint)) {
309 endPoint->deref();
310 return;
311 }
312 clearEndpoint(endPoint->toString());
313 endPoint->disconnect();
314 m_data.append(endPoint);
315 if (m_peer)
316 endPoint->connect(m_peer->getEndpoint(endPoint->toString()));
317 }
318
clearEndpoint(const String & type)319 void CallEndpoint::clearEndpoint(const String& type)
320 {
321 if (type.null()) {
322 ObjList* l = m_data.skipNull();
323 for (; l; l=l->skipNext()) {
324 DataEndpoint* e = static_cast<DataEndpoint*>(l->get());
325 DDebug(DebugAll,"Endpoint at %p type '%s' peer %p",e,e->name().c_str(),e->getPeer());
326 e->disconnect();
327 e->clearCall(this);
328 }
329 m_data.clear();
330 }
331 else {
332 DataEndpoint* dat = getEndpoint(type);
333 if (dat) {
334 m_data.remove(dat,false);
335 dat->disconnect();
336 dat->clearCall(this);
337 dat->destruct();
338 }
339 }
340 }
341
setSource(DataSource * source,const String & type)342 void CallEndpoint::setSource(DataSource* source, const String& type)
343 {
344 DataEndpoint* dat = source ? setEndpoint(type) : getEndpoint(type);
345 if (RefObject::alive(dat))
346 dat->setSource(source);
347 }
348
getSource(const String & type) const349 DataSource* CallEndpoint::getSource(const String& type) const
350 {
351 DataEndpoint* dat = getEndpoint(type);
352 return RefObject::alive(dat) ? dat->getSource() : 0;
353 }
354
setConsumer(DataConsumer * consumer,const String & type)355 void CallEndpoint::setConsumer(DataConsumer* consumer, const String& type)
356 {
357 DataEndpoint* dat = consumer ? setEndpoint(type) : getEndpoint(type);
358 if (RefObject::alive(dat))
359 dat->setConsumer(consumer);
360 }
361
getConsumer(const String & type) const362 DataConsumer* CallEndpoint::getConsumer(const String& type) const
363 {
364 DataEndpoint* dat = getEndpoint(type);
365 return RefObject::alive(dat) ? dat->getConsumer() : 0;
366 }
367
clearData(DataNode * node,const String & type)368 bool CallEndpoint::clearData(DataNode* node, const String& type)
369 {
370 if (type.null() || !node)
371 return false;
372 Lock mylock(DataEndpoint::commonMutex());
373 RefPointer<DataEndpoint> dat = getEndpoint(type);
374 return dat && dat->clearData(node);
375 }
376
audioType()377 const String& CallEndpoint::audioType()
378 {
379 return s_audioType;
380 }
381
382
383 static const String s_disconnected("chan.disconnected");
384
385 // Mutex used to lock disconnect parameters during access
386 static Mutex s_paramMutex(true,"ChannelParams");
387
388 // Mutex used to protect channel data
389 Mutex Channel::s_chanDataMutex(false,"ChannelData");
390
Channel(Driver * driver,const char * id,bool outgoing)391 Channel::Channel(Driver* driver, const char* id, bool outgoing)
392 : CallEndpoint(id),
393 m_parameters(""), m_chanParams(0), m_driver(driver), m_outgoing(outgoing),
394 m_timeout(0), m_maxcall(0), m_maxPDD(0), m_dtmfTime(0),
395 m_toutAns(0), m_dtmfSeq(0), m_answered(false)
396 {
397 init();
398 }
399
Channel(Driver & driver,const char * id,bool outgoing)400 Channel::Channel(Driver& driver, const char* id, bool outgoing)
401 : CallEndpoint(id),
402 m_parameters(""), m_chanParams(0), m_driver(&driver), m_outgoing(outgoing),
403 m_timeout(0), m_maxcall(0), m_maxPDD(0), m_dtmfTime(0),
404 m_toutAns(0), m_dtmfSeq(0), m_answered(false)
405 {
406 init();
407 }
408
~Channel()409 Channel::~Channel()
410 {
411 #ifdef DEBUG
412 Debugger debug(DebugAll,"Channel::~Channel()"," '%s' [%p]",id().c_str(),this);
413 #endif
414 cleanup();
415 TelEngine::destruct(m_chanParams);
416 }
417
getObject(const String & name) const418 void* Channel::getObject(const String& name) const
419 {
420 if (name == YATOM("Channel"))
421 return const_cast<Channel*>(this);
422 if (name == YATOM("MessageNotifier"))
423 return static_cast<MessageNotifier*>(const_cast<Channel*>(this));
424 return CallEndpoint::getObject(name);
425 }
426
paramMutex()427 Mutex& Channel::paramMutex()
428 {
429 return s_paramMutex;
430 }
431
init()432 void Channel::init()
433 {
434 status(direction());
435 m_mutex = m_driver;
436 if (m_driver) {
437 m_driver->lock();
438 debugName(m_driver->debugName());
439 debugChain(m_driver);
440 if (id().null()) {
441 String tmp(m_driver->prefix());
442 tmp << m_driver->nextid();
443 setId(tmp);
444 }
445 m_driver->unlock();
446 }
447 // assign a new billid only to incoming calls
448 if (m_billid.null() && !m_outgoing)
449 m_billid << Engine::runId() << "-" << allocId();
450 DDebug(this,DebugInfo,"Channel::init() '%s' [%p]",id().c_str(),this);
451 }
452
cleanup()453 void Channel::cleanup()
454 {
455 m_timeout = 0;
456 m_maxcall = 0;
457 m_maxPDD = 0;
458 status("deleted");
459 m_targetid.clear();
460 dropChan();
461 m_driver = 0;
462 m_mutex = 0;
463 }
464
filterDebug(const String & item)465 void Channel::filterDebug(const String& item)
466 {
467 if (m_driver) {
468 if (m_driver->filterInstalled())
469 debugEnabled(m_driver->filterDebug(item));
470 else
471 debugChain(m_driver);
472 }
473 }
474
initChan()475 void Channel::initChan()
476 {
477 if (!m_driver)
478 return;
479 Lock mylock(m_driver);
480 #ifndef NDEBUG
481 if (m_driver->channels().find(this)) {
482 Debug(DebugCrit,"Channel '%s' already in list of '%s' driver [%p]",
483 id().c_str(),m_driver->name().c_str(),this);
484 return;
485 }
486 #endif
487 m_driver->m_total++;
488 m_driver->m_chanCount++;
489 m_driver->channels().append(this);
490 m_driver->changed();
491 }
492
dropChan()493 void Channel::dropChan()
494 {
495 if (!m_driver)
496 return;
497 m_driver->lock();
498 if (!m_driver)
499 TraceDebug(traceId(),DebugFail,"Driver lost in dropChan! [%p]",this);
500 if (m_driver->channels().remove(this,false)) {
501 if (m_driver->m_chanCount > 0)
502 m_driver->m_chanCount--;
503 m_driver->changed();
504 }
505 m_driver->unlock();
506 }
507
zeroRefs()508 void Channel::zeroRefs()
509 {
510 // remove us from driver's list before calling the destructor
511 dropChan();
512 CallEndpoint::zeroRefs();
513 }
514
connected(const char * reason)515 void Channel::connected(const char* reason)
516 {
517 CallEndpoint::connected(reason);
518 if (m_billid.null()) {
519 Channel* peer = YOBJECT(Channel,getPeer());
520 if (peer && peer->billid())
521 m_billid = peer->billid();
522 }
523 Message* m = message("chan.connected",false,true);
524 setLastPeerId();
525 if (reason)
526 m->setParam("reason",reason);
527 if (!Engine::enqueue(m))
528 TelEngine::destruct(m);
529 }
530
disconnected(bool final,const char * reason)531 void Channel::disconnected(bool final, const char* reason)
532 {
533 if (final || Engine::exiting())
534 return;
535 // last chance to get reconnected to something
536 Message* m = getDisconnect(reason);
537 s_paramMutex.lock();
538 m_targetid.clear();
539 m_parameters.clearParams();
540 s_paramMutex.unlock();
541 Engine::enqueue(m);
542 }
543
setDisconnect(const NamedList * params)544 void Channel::setDisconnect(const NamedList* params)
545 {
546 DDebug(this,DebugInfo,"setDisconnect(%p) [%p]",params,this);
547 s_paramMutex.lock();
548 m_parameters.clearParams();
549 if (params)
550 m_parameters.copyParams(*params);
551 s_paramMutex.unlock();
552 }
553
endDisconnect(const Message & msg,bool handled)554 void Channel::endDisconnect(const Message& msg, bool handled)
555 {
556 }
557
dispatched(const Message & msg,bool handled)558 void Channel::dispatched(const Message& msg, bool handled)
559 {
560 if (s_disconnected == msg)
561 endDisconnect(msg,handled);
562 }
563
setId(const char * newId)564 void Channel::setId(const char* newId)
565 {
566 debugName(0);
567 CallEndpoint::setId(newId);
568 debugName(id());
569 }
570
getDisconnect(const char * reason)571 Message* Channel::getDisconnect(const char* reason)
572 {
573 Message* msg = new Message(s_disconnected);
574 s_paramMutex.lock();
575 msg->copyParams(m_parameters);
576 s_paramMutex.unlock();
577 complete(*msg);
578 if (reason)
579 msg->setParam("reason",reason);
580 // we will remain referenced until the message is destroyed
581 msg->userData(this);
582 msg->setNotify();
583 return msg;
584 }
585
status(const char * newstat)586 void Channel::status(const char* newstat)
587 {
588 Lock lck(chanDataMutex());
589 m_status = newstat;
590 if (!m_answered && (m_status == YSTRING("answered"))) {
591 m_answered = true;
592 // stop pre-answer timeout, restart answered timeout
593 m_maxcall = 0;
594 maxPDD(0);
595 if (m_toutAns)
596 timeout(Time::now() + m_toutAns*(u_int64_t)1000);
597 }
598 else if (m_status == YSTRING("ringing") || m_status == YSTRING("progressing"))
599 maxPDD(0);
600 }
601
direction() const602 const char* Channel::direction() const
603 {
604 return m_outgoing ? "outgoing" : "incoming";
605 }
606
setMaxcall(const Message * msg,int defTout)607 void Channel::setMaxcall(const Message* msg, int defTout)
608 {
609 int tout = msg ? msg->getIntValue(YSTRING("timeout"),defTout) : defTout;
610 if (tout > 0) {
611 m_toutAns = tout;
612 timeout(Time::now() + tout*(u_int64_t)1000);
613 }
614 else if (tout == 0) {
615 m_toutAns = 0;
616 timeout(0);
617 }
618 if (m_answered)
619 maxcall(0);
620 else if (msg) {
621 tout = msg->getIntValue(YSTRING("maxcall"),-1);
622 if (tout > 0) {
623 timeout(0);
624 maxcall(Time::now() + tout*(u_int64_t)1000);
625 }
626 else if (tout == 0)
627 maxcall(0);
628 }
629 }
630
setMaxPDD(const Message & msg)631 void Channel::setMaxPDD(const Message& msg)
632 {
633 if (m_answered) {
634 maxPDD(0);
635 return;
636 }
637 int tout = msg.getIntValue(YSTRING("maxpdd"),-1);
638 if (tout > 0)
639 maxPDD(Time::now() + tout * (u_int64_t)1000);
640 else if (tout == 0)
641 maxPDD(0);
642 }
643
complete(Message & msg,bool minimal) const644 void Channel::complete(Message& msg, bool minimal) const
645 {
646 static const String s_hangup("chan.hangup");
647
648 copyChanParams(msg);
649 msg.setParam("id",id());
650 if (traceId())
651 msg.setParam("trace_id",traceId());
652 if (m_driver)
653 msg.setParam("module",m_driver->name());
654 if (s_hangup == msg) {
655 s_paramMutex.lock();
656 msg.copyParams(parameters());
657 s_paramMutex.unlock();
658 }
659
660 if (minimal)
661 return;
662
663 String tmp;
664 if (getStatus(tmp,false))
665 msg.setParam(YSTRING("status"),tmp);
666 if (m_address)
667 msg.setParam("address",m_address);
668 if (m_targetid)
669 msg.setParam("targetid",m_targetid);
670 if (m_billid)
671 msg.setParam("billid",m_billid);
672 String peer;
673 if (getPeerId(peer))
674 msg.setParam("peerid",peer);
675 if (getLastPeerId(peer))
676 msg.setParam("lastpeerid",peer);
677 msg.setParam("answered",String::boolText(m_answered));
678 msg.setParam("direction",direction());
679 }
680
message(const char * name,bool minimal,bool data)681 Message* Channel::message(const char* name, bool minimal, bool data)
682 {
683 Message* msg = new Message(name);
684 if (data)
685 msg->userData(this);
686 complete(*msg,minimal);
687 return msg;
688 }
689
message(const char * name,const NamedList * original,const char * params,bool minimal,bool data)690 Message* Channel::message(const char* name, const NamedList* original, const char* params, bool minimal, bool data)
691 {
692 Message* msg = message(name,minimal,data);
693 if (original) {
694 if (!params)
695 params = original->getValue(s_copyParams);
696 if (!null(params))
697 msg->copyParams(*original,params);
698 }
699 return msg;
700 }
701
startRouter(Message * msg)702 bool Channel::startRouter(Message* msg)
703 {
704 if (!msg)
705 return false;
706 if (m_driver) {
707 Router* r = new Router(m_driver,id(),msg);
708 if (r->startup())
709 return true;
710 delete r;
711 }
712 else
713 TelEngine::destruct(msg);
714 callRejected("failure","Internal server error");
715 // dereference and die if the channel is dynamic
716 if (m_driver && m_driver->varchan())
717 deref();
718 return false;
719 }
720
msgProgress(Message & msg)721 bool Channel::msgProgress(Message& msg)
722 {
723 status("progressing");
724 if (m_billid.null())
725 m_billid = msg.getValue(YSTRING("billid"));
726 return true;
727 }
728
msgRinging(Message & msg)729 bool Channel::msgRinging(Message& msg)
730 {
731 status("ringing");
732 if (m_billid.null())
733 m_billid = msg.getValue(YSTRING("billid"));
734 return true;
735 }
736
msgAnswered(Message & msg)737 bool Channel::msgAnswered(Message& msg)
738 {
739 m_maxcall = 0;
740 int tout = msg.getIntValue(YSTRING("timeout"),m_toutAns);
741 m_toutAns = (tout > 0) ? tout : 0;
742 status("answered");
743 m_answered = true;
744 if (m_billid.null())
745 m_billid = msg.getValue(YSTRING("billid"));
746 return true;
747 }
748
msgTone(Message & msg,const char * tone)749 bool Channel::msgTone(Message& msg, const char* tone)
750 {
751 return false;
752 }
753
msgText(Message & msg,const char * text)754 bool Channel::msgText(Message& msg, const char* text)
755 {
756 return false;
757 }
758
msgDrop(Message & msg,const char * reason)759 bool Channel::msgDrop(Message& msg, const char* reason)
760 {
761 m_timeout = m_maxcall = m_maxPDD = 0;
762 status(null(reason) ? "dropped" : reason);
763 disconnect(reason,msg);
764 return true;
765 }
766
msgTransfer(Message & msg)767 bool Channel::msgTransfer(Message& msg)
768 {
769 return false;
770 }
771
msgUpdate(Message & msg)772 bool Channel::msgUpdate(Message& msg)
773 {
774 return false;
775 }
776
msgMasquerade(Message & msg)777 bool Channel::msgMasquerade(Message& msg)
778 {
779 if (m_billid.null())
780 m_billid = msg.getValue(YSTRING("billid"));
781 if (msg == YSTRING("call.answered")) {
782 TraceDebug(traceId(),this,DebugInfo,"Masquerading answer operation [%p]",this);
783 m_maxcall = 0;
784 maxPDD(0);
785 Lock lck(chanDataMutex());
786 m_status = "answered";
787 }
788 else if (msg == YSTRING("call.progress")) {
789 TraceDebug(traceId(),this,DebugInfo,"Masquerading progress operation [%p]",this);
790 status("progressing");
791 }
792 else if (msg == YSTRING("call.ringing")) {
793 TraceDebug(traceId(),this,DebugInfo,"Masquerading ringing operation [%p]",this);
794 status("ringing");
795 }
796 else if (msg == YSTRING("chan.dtmf")) {
797 // add sequence, stop the message if it was a disallowed DTMF duplicate
798 if (dtmfSequence(msg) && m_driver && !m_driver->m_dtmfDups) {
799 TraceDebug(traceId(),this,DebugNote,"Stopping duplicate '%s' DTMF '%s' [%p]",
800 msg.getValue("detected"),msg.getValue("text"),this);
801 return true;
802 }
803 }
804 return false;
805 }
806
msgStatus(Message & msg)807 void Channel::msgStatus(Message& msg)
808 {
809 String par;
810 Lock lock(mutex());
811 complete(msg);
812 statusParams(par);
813 lock.drop();
814 msg.retValue().clear();
815 msg.retValue() << "name=" << id() << ",type=channel;" << par << "\r\n";
816 }
817
818 // Control message handler that is invoked only for messages to this channel
819 // Find a data endpoint to process it
msgControl(Message & msg)820 bool Channel::msgControl(Message& msg)
821 {
822 setMaxcall(msg);
823 setMaxPDD(msg);
824 setChanParams(msg);
825 for (ObjList* o = m_data.skipNull(); o; o = o->skipNext()) {
826 DataEndpoint* dep = static_cast<DataEndpoint*>(o->get());
827 if (dep->control(msg))
828 return true;
829 }
830 return false;
831 }
832
statusParams(String & str)833 void Channel::statusParams(String& str)
834 {
835 if (m_driver)
836 str.append("module=",",") << m_driver->name();
837 String peer;
838 if (getPeerId(peer))
839 str.append("peerid=",",") << peer;
840 str.append("status=",",");
841 getStatus(str);
842 str << ",direction=" << direction();
843 str << ",answered=" << m_answered;
844 str << ",targetid=" << m_targetid;
845 str << ",address=" << m_address;
846 str << ",billid=" << m_billid;
847 if (m_timeout || m_maxcall || m_maxPDD) {
848 u_int64_t t = Time::now();
849 if (m_timeout) {
850 str << ",timeout=";
851 if (m_timeout > t)
852 str << (unsigned int)((m_timeout - t + 500) / 1000);
853 else
854 str << "expired";
855 }
856 if (m_maxcall) {
857 str << ",maxcall=";
858 if (m_maxcall > t)
859 str << (unsigned int)((m_maxcall - t + 500) / 1000);
860 else
861 str << "expired";
862 }
863 if (m_maxPDD) {
864 str << ",maxpdd=";
865 if (m_maxPDD > t)
866 str << (unsigned int)((m_maxPDD - t + 500) / 1000);
867 else
868 str << "expired";
869 }
870 }
871 }
872
checkTimers(Message & msg,const Time & tmr)873 void Channel::checkTimers(Message& msg, const Time& tmr)
874 {
875 if (timeout() && (timeout() < tmr))
876 msgDrop(msg,"timeout");
877 else if (maxcall() && (maxcall() < tmr))
878 msgDrop(msg,"noanswer");
879 else if (maxPDD() && (maxPDD() < tmr))
880 msgDrop(msg,"postdialdelay");
881 }
882
callPrerouted(Message & msg,bool handled)883 bool Channel::callPrerouted(Message& msg, bool handled)
884 {
885 status("prerouted");
886 // accept a new billid at this stage
887 String* str = msg.getParam(YSTRING("billid"));
888 if (str)
889 m_billid = *str;
890 setChanParams(msg,true);
891 return true;
892 }
893
callRouted(Message & msg)894 bool Channel::callRouted(Message& msg)
895 {
896 status("routed");
897 if (m_billid.null())
898 m_billid = msg.getValue(YSTRING("billid"));
899 setChanParams(msg,true);
900 return true;
901 }
902
callAccept(Message & msg)903 void Channel::callAccept(Message& msg)
904 {
905 status("accepted");
906 int defTout = m_driver ? m_driver->timeout() : -1;
907 if (defTout <= 0)
908 defTout = -1;
909 setMaxcall(msg,defTout);
910 setChanParams(msg,true);
911 if (m_billid.null())
912 m_billid = msg.getValue(YSTRING("billid"));
913 m_targetid = msg.getValue(YSTRING("targetid"));
914 String detect = msg.getValue(YSTRING("tonedetect_in"));
915 if (detect && detect.toBoolean(true)) {
916 if (detect.toBoolean(false))
917 detect = "tone/*";
918 toneDetect(detect);
919 }
920 if (msg.getBoolValue(YSTRING("autoanswer")))
921 msgAnswered(msg);
922 else if (msg.getBoolValue(YSTRING("autoring")))
923 msgRinging(msg);
924 else if (msg.getBoolValue(YSTRING("autoprogress")))
925 msgProgress(msg);
926 else if (m_targetid.null() && msg.getBoolValue(YSTRING("autoanswer"),true)) {
927 // no preference exists in the message so issue a notice
928 TraceDebug(traceId(),this,DebugNote,"Answering now call %s because we have no targetid [%p]",
929 id().c_str(),this);
930 msgAnswered(msg);
931 }
932 }
933
callConnect(Message & msg)934 void Channel::callConnect(Message& msg)
935 {
936 String detect = msg.getValue(YSTRING("tonedetect_out"));
937 if (detect && detect.toBoolean(true)) {
938 if (detect.toBoolean(false))
939 detect = "tone/*";
940 toneDetect(detect);
941 }
942 }
943
callRejected(const char * error,const char * reason,const Message * msg)944 void Channel::callRejected(const char* error, const char* reason, const Message* msg)
945 {
946 TraceDebug(traceId(),this,DebugMild,"Call rejected error='%s' reason='%s' [%p]",error,reason,this);
947 if (msg) {
948 const String* cp = msg->getParam(s_copyParams);
949 if (!TelEngine::null(cp)) {
950 s_paramMutex.lock();
951 parameters().copyParams(*msg,*cp);
952 s_paramMutex.unlock();
953 }
954 setChanParams(*msg,true);
955 }
956 status("rejected");
957 }
958
dtmfSequence(Message & msg)959 bool Channel::dtmfSequence(Message& msg)
960 {
961 if ((msg != YSTRING("chan.dtmf")) || msg.getParam(YSTRING("sequence")))
962 return false;
963 bool duplicate = false;
964 const String* detected = msg.getParam(YSTRING("detected"));
965 const String* text = msg.getParam(YSTRING("text"));
966 Lock lock(mutex());
967 unsigned int seq = m_dtmfSeq;
968 if (text && detected &&
969 (*text == m_dtmfText) && (*detected != m_dtmfDetected) &&
970 (msg.msgTime() < m_dtmfTime))
971 duplicate = true;
972 else {
973 seq = ++m_dtmfSeq;
974 m_dtmfTime = msg.msgTime() + 4000000;
975 m_dtmfText = text;
976 m_dtmfDetected = detected;
977 }
978 // need to add sequence number used to detect reorders
979 msg.addParam("sequence",String(seq));
980 msg.addParam("duplicate",String::boolText(duplicate));
981 return duplicate;
982 }
983
dtmfEnqueue(Message * msg)984 bool Channel::dtmfEnqueue(Message* msg)
985 {
986 if (!msg)
987 return false;
988 if (dtmfSequence(*msg) && m_driver && !m_driver->m_dtmfDups) {
989 TraceDebug(traceId(),this,DebugNote,"Dropping duplicate '%s' DTMF '%s' [%p]",
990 msg->getValue("detected"),msg->getValue("text"),this);
991 TelEngine::destruct(msg);
992 return false;
993 }
994 return Engine::enqueue(msg);
995 }
996
dtmfInband(const char * tone)997 bool Channel::dtmfInband(const char* tone)
998 {
999 if (null(tone))
1000 return false;
1001 Message m("chan.attach");
1002 complete(m,true);
1003 m.userData(this);
1004 String tmp("tone/dtmfstr/");
1005 tmp += tone;
1006 m.setParam("override",tmp);
1007 m.setParam("single","yes");
1008 return Engine::dispatch(m);
1009 }
1010
toneDetect(const char * sniffer)1011 bool Channel::toneDetect(const char* sniffer)
1012 {
1013 if (null(sniffer))
1014 sniffer = "tone/*";
1015 Message m("chan.attach");
1016 complete(m,true);
1017 m.userData(this);
1018 m.setParam("sniffer",sniffer);
1019 m.setParam("single","yes");
1020 return Engine::dispatch(m);
1021 }
1022
setDebug(Message & msg)1023 bool Channel::setDebug(Message& msg)
1024 {
1025 String str = msg.getValue("line");
1026 if (str.startSkip("level")) {
1027 int dbg = debugLevel();
1028 str >> dbg;
1029 if (str == "+") {
1030 if (debugLevel() > dbg)
1031 dbg = debugLevel();
1032 }
1033 else if (str == "-") {
1034 if (debugLevel() < dbg)
1035 dbg = debugLevel();
1036 }
1037 debugLevel(dbg);
1038 }
1039 else if (str == "reset")
1040 debugChain(m_driver);
1041 else if (str == "engine")
1042 debugCopy();
1043 else if (str.isBoolean())
1044 debugEnabled(str.toBoolean(debugEnabled()));
1045 msg.retValue() << "Channel " << id()
1046 << " debug " << (debugEnabled() ? "on" : "off")
1047 << " level " << debugLevel() << (debugChained() ? " chained" : "") << "\r\n";
1048 return true;
1049 }
1050
allocId()1051 unsigned int Channel::allocId()
1052 {
1053 s_callidMutex.lock();
1054 unsigned int id = ++s_callid;
1055 s_callidMutex.unlock();
1056 return id;
1057 }
1058
1059 TokenDict Module::s_messages[] = {
1060 { "engine.status", Module::Status },
1061 { "engine.timer", Module::Timer },
1062 { "engine.debug", Module::Level },
1063 { "engine.command", Module::Command },
1064 { "engine.help", Module::Help },
1065 { "engine.halt", Module::Halt },
1066 { "engine.stop", Module::Stop },
1067 { "call.route", Module::Route },
1068 { "call.execute", Module::Execute },
1069 { "call.drop", Module::Drop },
1070 { "call.progress", Module::Progress },
1071 { "call.ringing", Module::Ringing },
1072 { "call.answered", Module::Answered },
1073 { "call.update", Module::Update },
1074 { "chan.dtmf", Module::Tone },
1075 { "chan.text", Module::Text },
1076 { "chan.masquerade", Module::Masquerade },
1077 { "chan.locate", Module::Locate },
1078 { "chan.transfer", Module::Transfer },
1079 { "chan.control", Module::Control },
1080 { "msg.execute", Module::MsgExecute },
1081 { 0, 0 }
1082 };
1083
1084 unsigned int Module::s_delay = 5;
1085
messageName(int id)1086 const char* Module::messageName(int id)
1087 {
1088 if ((id <= 0) || (id >PubLast))
1089 return 0;
1090 return lookup(id,s_messages);
1091 }
1092
Module(const char * name,const char * type,bool earlyInit)1093 Module::Module(const char* name, const char* type, bool earlyInit)
1094 : Plugin(name,earlyInit), Mutex(true,"Module"),
1095 m_init(false), m_relays(0), m_type(type), m_changed(0)
1096 {
1097 }
1098
~Module()1099 Module::~Module()
1100 {
1101 }
1102
getObject(const String & name) const1103 void* Module::getObject(const String& name) const
1104 {
1105 if (name == YATOM("Module"))
1106 return const_cast<Module*>(this);
1107 return Plugin::getObject(name);
1108 }
1109
installRelay(int id,const char * name,unsigned priority)1110 bool Module::installRelay(int id, const char* name, unsigned priority)
1111 {
1112 if (!(id && name && priority))
1113 return false;
1114
1115 TempObjectCounter cnt(objectsCounter(),true);
1116 Lock lock(this);
1117 if (m_relays & id)
1118 return true;
1119 m_relays |= id;
1120
1121 MessageRelay* relay = new MessageRelay(name,this,id,priority,Module::name());
1122 m_relayList.append(relay)->setDelete(false);
1123 Engine::install(relay);
1124 return true;
1125 }
1126
installRelay(int id,unsigned priority)1127 bool Module::installRelay(int id, unsigned priority)
1128 {
1129 return installRelay(id,messageName(id),priority);
1130 }
1131
installRelay(const char * name,unsigned priority)1132 bool Module::installRelay(const char* name, unsigned priority)
1133 {
1134 return installRelay(lookup(name,s_messages),name,priority);
1135 }
1136
installRelay(MessageRelay * relay)1137 bool Module::installRelay(MessageRelay* relay)
1138 {
1139 if (!relay || ((relay->id() & m_relays) != 0) || m_relayList.find(relay))
1140 return false;
1141 m_relays |= relay->id();
1142 m_relayList.append(relay)->setDelete(false);
1143 Engine::install(relay);
1144 return true;
1145 }
1146
uninstallRelay(MessageRelay * relay,bool delRelay)1147 bool Module::uninstallRelay(MessageRelay* relay, bool delRelay)
1148 {
1149 if (!relay || ((relay->id() & m_relays) == 0) || !m_relayList.remove(relay,false))
1150 return false;
1151 Engine::uninstall(relay);
1152 m_relays &= ~relay->id();
1153 if (delRelay)
1154 TelEngine::destruct(relay);
1155 return true;
1156 }
1157
uninstallRelay(int id,bool delRelay)1158 bool Module::uninstallRelay(int id, bool delRelay)
1159 {
1160 if ((id & m_relays) == 0)
1161 return false;
1162 for (ObjList* l = m_relayList.skipNull(); l; l = l->skipNext()) {
1163 MessageRelay* r = static_cast<MessageRelay*>(l->get());
1164 if (r->id() != id)
1165 continue;
1166 Engine::uninstall(r);
1167 m_relays &= ~id;
1168 l->remove(delRelay);
1169 break;
1170 }
1171 return false;
1172 }
1173
1174
uninstallRelays()1175 bool Module::uninstallRelays()
1176 {
1177 while (MessageRelay* relay = static_cast<MessageRelay*>(m_relayList.remove(false))) {
1178 Engine::uninstall(relay);
1179 m_relays &= ~relay->id();
1180 relay->destruct();
1181 }
1182 return (0 == m_relays) && (0 == m_relayList.count());
1183 }
1184
initialize()1185 void Module::initialize()
1186 {
1187 setup();
1188 }
1189
setup()1190 void Module::setup()
1191 {
1192 DDebug(this,DebugAll,"Module::setup()");
1193 if (m_init)
1194 return;
1195 m_init = true;
1196 installRelay(Timer,90);
1197 installRelay(Status,110);
1198 installRelay(Level,120);
1199 installRelay(Command,120);
1200 }
1201
changed()1202 void Module::changed()
1203 {
1204 if (s_delay && !m_changed)
1205 m_changed = Time::now() + s_delay*(u_int64_t)1000000;
1206 }
1207
msgTimer(Message & msg)1208 void Module::msgTimer(Message& msg)
1209 {
1210 if (m_changed && (msg.msgTime() > m_changed)) {
1211 Message* m = new Message("module.update");
1212 m->addParam("module",name());
1213 m_changed = 0;
1214 genUpdate(*m);
1215 Engine::enqueue(m);
1216 }
1217 }
1218
msgRoute(Message & msg)1219 bool Module::msgRoute(Message& msg)
1220 {
1221 return false;
1222 }
1223
msgCommand(Message & msg)1224 bool Module::msgCommand(Message& msg)
1225 {
1226 const NamedString* line = msg.getParam(YSTRING("line"));
1227 if (line)
1228 return commandExecute(msg.retValue(),*line);
1229 if (msg.getParam(YSTRING("partline")) || msg.getParam(YSTRING("partword")))
1230 return commandComplete(msg,msg.getValue(YSTRING("partline")),msg.getValue(YSTRING("partword")));
1231 return false;
1232 }
1233
commandExecute(String & retVal,const String & line)1234 bool Module::commandExecute(String& retVal, const String& line)
1235 {
1236 return false;
1237 }
1238
commandComplete(Message & msg,const String & partLine,const String & partWord)1239 bool Module::commandComplete(Message& msg, const String& partLine, const String& partWord)
1240 {
1241 if ((partLine == YSTRING("debug")) || (partLine == YSTRING("status")))
1242 itemComplete(msg.retValue(),name(),partWord);
1243 return false;
1244 }
1245
itemComplete(String & itemList,const String & item,const String & partWord)1246 bool Module::itemComplete(String& itemList, const String& item, const String& partWord)
1247 {
1248 if (partWord.null() || item.startsWith(partWord)) {
1249 itemList.append(item,"\t");
1250 return true;
1251 }
1252 return false;
1253 }
1254
msgStatus(Message & msg)1255 void Module::msgStatus(Message& msg)
1256 {
1257 String mod, par, det;
1258 bool details = msg.getBoolValue(YSTRING("details"),true);
1259 lock();
1260 statusModule(mod);
1261 statusParams(par);
1262 if (details)
1263 statusDetail(det);
1264 unlock();
1265 msg.retValue() << mod << ";" << par;
1266 if (det)
1267 msg.retValue() << ";" << det;
1268 msg.retValue() << "\r\n";
1269 }
1270
statusModule(String & str)1271 void Module::statusModule(String& str)
1272 {
1273 str.append("name=",",") << name();
1274 if (m_type)
1275 str << ",type=" << m_type;
1276 }
1277
statusParams(String & str)1278 void Module::statusParams(String& str)
1279 {
1280 }
1281
statusDetail(String & str)1282 void Module::statusDetail(String& str)
1283 {
1284 }
1285
genUpdate(Message & msg)1286 void Module::genUpdate(Message& msg)
1287 {
1288 }
1289
received(Message & msg,int id)1290 bool Module::received(Message &msg, int id)
1291 {
1292 if (name().null())
1293 return false;
1294
1295 switch (id) {
1296 case Timer:
1297 lock();
1298 msgTimer(msg);
1299 unlock();
1300 return false;
1301 case Route:
1302 return msgRoute(msg);
1303 }
1304
1305 String dest = msg.getValue(YSTRING("module"));
1306
1307 if (id == Status) {
1308 if (dest == name()) {
1309 msgStatus(msg);
1310 return true;
1311 }
1312 if (dest.null() || (dest == m_type))
1313 msgStatus(msg);
1314 return false;
1315 }
1316 else if (id == Level)
1317 return setDebug(msg,dest);
1318 else if (id == Command)
1319 return msgCommand(msg);
1320
1321 return false;
1322 }
1323
setDebug(Message & msg,const String & target)1324 bool Module::setDebug(Message& msg, const String& target)
1325 {
1326 if (target != name())
1327 return false;
1328
1329 NamedCounter* counter = objectsCounter();
1330 String str = msg.getValue("line");
1331 if (str.startSkip("level")) {
1332 int dbg = debugLevel();
1333 str >> dbg;
1334 if (str == "+") {
1335 if (debugLevel() > dbg)
1336 dbg = debugLevel();
1337 }
1338 else if (str == "-") {
1339 if (debugLevel() < dbg)
1340 dbg = debugLevel();
1341 }
1342 debugLevel(dbg);
1343 }
1344 else if (str == "reset") {
1345 debugLevel(TelEngine::debugLevel());
1346 debugEnabled(true);
1347 if (counter)
1348 counter->enable(getObjCounting());
1349 }
1350 else if (str.startSkip("objects")) {
1351 bool dbg = (str == "reset") ? getObjCounting() : (counter && counter->enabled());
1352 str >> dbg;
1353 if (counter)
1354 counter->enable(dbg);
1355 }
1356 else if (str.startSkip("filter"))
1357 m_filter = str;
1358 else {
1359 bool dbg = debugEnabled();
1360 str >> dbg;
1361 debugEnabled(dbg);
1362 }
1363 msg.retValue() << "Module " << name()
1364 << " debug " << (debugEnabled() ? "on" : "off")
1365 << " level " << debugLevel()
1366 << " objects " << ((counter && counter->enabled()) ? "on" : "off");
1367 if (m_filter)
1368 msg.retValue() << " filter: " << m_filter;
1369 msg.retValue() << "\r\n";
1370 return true;
1371 }
1372
filterDebug(const String & item) const1373 bool Module::filterDebug(const String& item) const
1374 {
1375 return m_filter.null() ? debugEnabled() : m_filter.matches(item);
1376 }
1377
1378
Driver(const char * name,const char * type)1379 Driver::Driver(const char* name, const char* type)
1380 : Module(name,type),
1381 m_init(false), m_varchan(true),
1382 m_routing(0), m_routed(0), m_total(0),
1383 m_nextid(0), m_timeout(0),
1384 m_maxroute(0), m_maxchans(0), m_chanCount(0),
1385 m_dtmfDups(false), m_doExpire(true)
1386 {
1387 m_prefix << name << "/";
1388 }
1389
getObject(const String & name) const1390 void* Driver::getObject(const String& name) const
1391 {
1392 if (name == YATOM("Driver"))
1393 return const_cast<Driver*>(this);
1394 return Module::getObject(name);
1395 }
1396
initialize()1397 void Driver::initialize()
1398 {
1399 setup();
1400 }
1401
setup(const char * prefix,bool minimal)1402 void Driver::setup(const char* prefix, bool minimal)
1403 {
1404 DDebug(this,DebugAll,"Driver::setup('%s',%d)",prefix,minimal);
1405 Module::setup();
1406 loadLimits();
1407 if (m_init)
1408 return;
1409 m_init = true;
1410 m_prefix = prefix ? prefix : name().c_str();
1411 if (m_prefix && !m_prefix.endsWith("/"))
1412 m_prefix += "/";
1413 XDebug(DebugAll,"setup name='%s' prefix='%s'",name().c_str(),m_prefix.c_str());
1414 installRelay(Masquerade,10);
1415 installRelay(Locate,40);
1416 installRelay(Drop,60);
1417 installRelay(Execute,90);
1418 installRelay(Control,90);
1419 if (minimal)
1420 return;
1421 installRelay(Tone);
1422 installRelay(Text);
1423 installRelay(Ringing);
1424 installRelay(Answered);
1425 }
1426
isBusy() const1427 bool Driver::isBusy() const
1428 {
1429 return (m_routing || m_chanCount);
1430 }
1431
find(const String & id) const1432 Channel* Driver::find(const String& id) const
1433 {
1434 const ObjList* pos = m_chans.find(id);
1435 return pos ? static_cast<Channel*>(pos->get()) : 0;
1436 }
1437
received(Message & msg,int id)1438 bool Driver::received(Message &msg, int id)
1439 {
1440 if (!m_prefix)
1441 return false;
1442 // pick destination depending on message type
1443 String dest;
1444 switch (id) {
1445 case Timer:
1446 if (m_doExpire && lock(950000)) {
1447 if (m_doExpire) {
1448 m_doExpire = false;
1449 // check each channel for timeouts
1450 ListIterator iter(m_chans);
1451 Time t;
1452 for (;;) {
1453 RefPointer<Channel> c = static_cast<Channel*>(iter.get());
1454 unlock();
1455 if (!c)
1456 break;
1457 c->checkTimers(msg,t);
1458 c = 0;
1459 lock();
1460 }
1461 m_doExpire = true;
1462 }
1463 else
1464 unlock();
1465 }
1466 return Module::received(msg,id);
1467 case Status:
1468 // check if it's a channel status request
1469 dest = msg.getValue(YSTRING("module"));
1470 if (dest.startsWith(m_prefix))
1471 break;
1472 // fall through
1473 case Level:
1474 case Route:
1475 case Command:
1476 return Module::received(msg,id);
1477 case Halt:
1478 dropAll(msg);
1479 return false;
1480 case Execute:
1481 dest = msg.getValue(YSTRING("callto"));
1482 break;
1483 case Drop:
1484 case Masquerade:
1485 case Locate:
1486 dest = msg.getValue(YSTRING("id"));
1487 break;
1488 default:
1489 dest = msg.getValue(YSTRING("peerid"));
1490 // if this channel is not the peer, try to match it as target
1491 if (!dest.startsWith(m_prefix))
1492 dest = msg.getValue(YSTRING("targetid"));
1493 break;
1494 }
1495 XDebug(DebugAll,"id=%d prefix='%s' dest='%s'",id,m_prefix.c_str(),dest.c_str());
1496
1497 if (id == Drop) {
1498 bool exact = (dest == name());
1499 if (exact || dest.null() || (dest == type())) {
1500 dropAll(msg);
1501 return exact;
1502 }
1503 }
1504
1505 // handle call.execute which should start a new channel
1506 if (id == Execute) {
1507 if (!canAccept(false))
1508 return false;
1509 if (dest.startSkip(m_prefix,false) ||
1510 (dest.startSkip("line/",false) && hasLine(msg.getValue(YSTRING("line"))))) {
1511 if (msg.getBoolValue(YSTRING("stop_call"),false) && !canStopCall()) {
1512 msg.setParam(YSTRING("error"),"stopped_call");
1513 return false;
1514 }
1515 return msgExecute(msg,dest);
1516 }
1517 return false;
1518 }
1519
1520 // check if the message was for this driver
1521 if (!dest.startsWith(m_prefix))
1522 return false;
1523
1524 lock();
1525 RefPointer<Channel> chan = find(dest);
1526 unlock();
1527 if (!chan) {
1528 DDebug(this,DebugMild,"Could not find channel '%s'",dest.c_str());
1529 return false;
1530 }
1531
1532 switch (id) {
1533 case Status:
1534 chan->msgStatus(msg);
1535 return true;
1536 case Progress:
1537 return chan->isIncoming() && !chan->isAnswered() && chan->msgProgress(msg);
1538 case Ringing:
1539 return chan->isIncoming() && !chan->isAnswered() && chan->msgRinging(msg);
1540 case Answered:
1541 return chan->isIncoming() && !chan->isAnswered() && chan->msgAnswered(msg);
1542 case Tone:
1543 return chan->msgTone(msg,msg.getValue("text"));
1544 case Text:
1545 return chan->msgText(msg,msg.getValue("text"));
1546 case Drop:
1547 return chan->msgDrop(msg,msg.getValue("reason"));
1548 case Transfer:
1549 return chan->msgTransfer(msg);
1550 case Update:
1551 return chan->msgUpdate(msg);
1552 case Masquerade:
1553 msg = msg.getValue(YSTRING("message"));
1554 msg.clearParam(YSTRING("message"));
1555 msg.userData(chan);
1556 if (chan->msgMasquerade(msg))
1557 return true;
1558 chan->complete(msg,msg.getBoolValue(YSTRING("complete_minimal"),false));
1559 return false;
1560 case Locate:
1561 msg.userData(chan);
1562 return true;
1563 case Control:
1564 return chan->msgControl(msg);
1565 }
1566 return false;
1567 }
1568
dropAll(Message & msg)1569 void Driver::dropAll(Message &msg)
1570 {
1571 const char* reason = msg.getValue(YSTRING("reason"));
1572 lock();
1573 ListIterator iter(m_chans);
1574 for (;;) {
1575 RefPointer<Channel> c = static_cast<Channel*>(iter.get());
1576 unlock();
1577 if (!c)
1578 break;
1579 DDebug(this,DebugAll,"Dropping %s channel '%s' @%p [%p]",
1580 name().c_str(),c->id().c_str(),static_cast<Channel*>(c),this);
1581 c->msgDrop(msg,reason);
1582 c = 0;
1583 lock();
1584 }
1585 }
1586
canAccept(bool routers)1587 bool Driver::canAccept(bool routers)
1588 {
1589 if (Engine::exiting())
1590 return false;
1591 if (routers && !canRoute())
1592 return false;
1593 if (m_maxchans)
1594 return (m_chanCount < m_maxchans);
1595 return true;
1596 }
1597
canRoute()1598 bool Driver::canRoute()
1599 {
1600 if (Engine::exiting() || (Engine::accept() >= Engine::Congestion))
1601 return false;
1602 if (m_maxroute && (m_routing >= m_maxroute))
1603 return false;
1604 return true;
1605 }
1606
hasLine(const String & line) const1607 bool Driver::hasLine(const String& line) const
1608 {
1609 return false;
1610 }
1611
msgRoute(Message & msg)1612 bool Driver::msgRoute(Message& msg)
1613 {
1614 String called = msg.getValue(YSTRING("called"));
1615 if (called.null())
1616 return false;
1617 String line = msg.getValue(YSTRING("line"));
1618 if (line.null())
1619 line = msg.getValue(YSTRING("account"));
1620 if (line && hasLine(line)) {
1621 // asked to route to a line we have locally
1622 msg.setParam("line",line);
1623 msg.retValue() = prefix() + called;
1624 return true;
1625 }
1626 return Module::msgRoute(msg);
1627 }
1628
genUpdate(Message & msg)1629 void Driver::genUpdate(Message& msg)
1630 {
1631 msg.addParam("routed",String(m_routed));
1632 msg.addParam("routing",String(m_routing));
1633 msg.addParam("total",String(m_total));
1634 msg.addParam("chans",String(m_chanCount));
1635 }
1636
statusModule(String & str)1637 void Driver::statusModule(String& str)
1638 {
1639 Module::statusModule(str);
1640 str.append("format=Status|Address|Peer",",");
1641 }
1642
statusParams(String & str)1643 void Driver::statusParams(String& str)
1644 {
1645 Module::statusParams(str);
1646 str.append("routed=",",") << m_routed;
1647 str << ",routing=" << m_routing;
1648 str << ",total=" << m_total;
1649 str << ",chans=" << m_chanCount;
1650 }
1651
statusDetail(String & str)1652 void Driver::statusDetail(String& str)
1653 {
1654 ObjList* l = m_chans.skipNull();
1655 for (; l; l=l->skipNext()) {
1656 Channel* c = static_cast<Channel*>(l->get());
1657 str.append(c->id(),",") << "=";
1658 c->getStatus(str);
1659 str << "|" << String::uriEscape(c->address(),",;|"," +?&")
1660 << "|" << c->getPeerId();
1661 }
1662 }
1663
commandComplete(Message & msg,const String & partLine,const String & partWord)1664 bool Driver::commandComplete(Message& msg, const String& partLine, const String& partWord)
1665 {
1666 bool ok = false;
1667 bool listChans = String(msg.getValue(YSTRING("complete"))) == YSTRING("channels");
1668 if (listChans && (partWord.null() || name().startsWith(partWord)))
1669 msg.retValue().append(name(),"\t");
1670 else
1671 ok = Module::commandComplete(msg,partLine,partWord);
1672 lock();
1673 unsigned int nchans = m_chans.count();
1674 unlock();
1675 if (nchans && listChans) {
1676 if (name().startsWith(partWord)) {
1677 msg.retValue().append(prefix(),"\t");
1678 return ok;
1679 }
1680 if (partWord.startsWith(prefix()))
1681 ok = true;
1682 lock();
1683 ObjList* l = m_chans.skipNull();
1684 for (; l; l=l->skipNext()) {
1685 Channel* c = static_cast<Channel*>(l->get());
1686 if (c->id().startsWith(partWord))
1687 msg.retValue().append(c->id(),"\t");
1688 }
1689 unlock();
1690 }
1691 return ok;
1692 }
1693
setDebug(Message & msg,const String & target)1694 bool Driver::setDebug(Message& msg, const String& target)
1695 {
1696 if (!target.startsWith(m_prefix))
1697 return Module::setDebug(msg,target);
1698
1699 Lock lock(this);
1700 Channel* chan = find(target);
1701 if (chan)
1702 return chan->setDebug(msg);
1703
1704 return false;
1705 }
1706
loadLimits()1707 void Driver::loadLimits()
1708 {
1709 timeout(Engine::config().getIntValue(YSTRING("telephony"),"timeout"));
1710 maxRoute(Engine::config().getIntValue(YSTRING("telephony"),"maxroute"));
1711 maxChans(Engine::config().getIntValue(YSTRING("telephony"),"maxchans"));
1712 dtmfDups(Engine::config().getBoolValue(YSTRING("telephony"),"dtmfdups"));
1713 }
1714
nextid()1715 unsigned int Driver::nextid()
1716 {
1717 Lock lock(this);
1718 return ++m_nextid;
1719 }
1720
1721
Router(Driver * driver,const char * id,Message * msg)1722 Router::Router(Driver* driver, const char* id, Message* msg)
1723 : Thread("Call Router"), m_driver(driver), m_id(id), m_msg(msg)
1724 {
1725 if (driver)
1726 setObjCounter(driver->objectsCounter());
1727 }
1728
run()1729 void Router::run()
1730 {
1731 if (!(m_driver && m_msg))
1732 return;
1733 m_driver->lock();
1734 m_driver->m_routing++;
1735 m_driver->changed();
1736 m_driver->unlock();
1737 bool ok = route();
1738 m_driver->lock();
1739 m_driver->m_routing--;
1740 if (ok)
1741 m_driver->m_routed++;
1742 m_driver->changed();
1743 m_driver->unlock();
1744 }
1745
route()1746 bool Router::route()
1747 {
1748 DDebug(m_driver,DebugAll,"Routing thread for '%s' [%p]",m_id.c_str(),this);
1749
1750 RefPointer<Channel> chan;
1751 String tmp(m_msg->getValue(YSTRING("callto")));
1752 bool ok = !tmp.null();
1753 if (ok)
1754 m_msg->retValue() = tmp;
1755 else {
1756 if (*m_msg == YSTRING("call.preroute")) {
1757 ok = Engine::dispatch(m_msg);
1758 m_driver->lock();
1759 chan = m_driver->find(m_id);
1760 m_driver->unlock();
1761 if (!chan) {
1762 Debug(m_driver,DebugInfo,"Connection '%s' vanished while prerouting!",m_id.c_str());
1763 return false;
1764 }
1765 const String* cp = m_msg->getParam(s_copyParams);
1766 if (!TelEngine::null(cp)) {
1767 Channel::paramMutex().lock();
1768 chan->parameters().copyParams(*m_msg,*cp);
1769 Channel::paramMutex().unlock();
1770 }
1771 bool dropCall = ok && ((m_msg->retValue() == YSTRING("-")) || (m_msg->retValue() == YSTRING("error")));
1772 if (dropCall)
1773 chan->callRejected(m_msg->getValue(YSTRING("error"),"unknown"),
1774 m_msg->getValue(YSTRING("reason")),m_msg);
1775 else
1776 dropCall = !chan->callPrerouted(*m_msg,ok);
1777 if (dropCall) {
1778 // get rid of the dynamic chans
1779 if (m_driver->varchan())
1780 chan->deref();
1781 return false;
1782 }
1783 chan = 0;
1784 *m_msg = "call.route";
1785 m_msg->retValue().clear();
1786 if (Engine::trackParam())
1787 m_msg->clearParam(Engine::trackParam());
1788 m_msg->msgTime() = Time::now();
1789 }
1790 ok = Engine::dispatch(m_msg);
1791 }
1792
1793 m_driver->lock();
1794 chan = m_driver->find(m_id);
1795 m_driver->unlock();
1796
1797 if (!chan) {
1798 Debug(m_driver,DebugInfo,"Connection '%s' vanished while routing!",m_id.c_str());
1799 return false;
1800 }
1801 // chan will keep it referenced even if message user data is changed
1802 m_msg->userData(chan);
1803
1804 static const char s_noroute[] = "noroute";
1805 static const char s_looping[] = "looping";
1806 static const char s_noconn[] = "noconn";
1807
1808 if (ok && m_msg->retValue().trimSpaces()) {
1809 if ((m_msg->retValue() == YSTRING("-")) || (m_msg->retValue() == YSTRING("error")))
1810 chan->callRejected(m_msg->getValue(YSTRING("error"),"unknown"),
1811 m_msg->getValue("reason"),m_msg);
1812 else if (m_msg->getIntValue(YSTRING("antiloop"),1) <= 0) {
1813 const char* error = m_msg->getValue(YSTRING("error"),s_looping);
1814 chan->callRejected(error,m_msg->getValue(YSTRING("reason"),
1815 ((s_looping == error) ? "Call is looping" : (const char*)0)),m_msg);
1816 }
1817 else if (chan->callRouted(*m_msg)) {
1818 *m_msg = "call.execute";
1819 m_msg->setParam("callto",m_msg->retValue());
1820 m_msg->clearParam(YSTRING("error"));
1821 m_msg->retValue().clear();
1822 if (Engine::trackParam())
1823 m_msg->clearParam(Engine::trackParam());
1824 m_msg->msgTime() = Time::now();
1825 ok = Engine::dispatch(m_msg);
1826 if (ok)
1827 chan->callAccept(*m_msg);
1828 else {
1829 const char* error = m_msg->getValue(YSTRING("error"),s_noconn);
1830 const char* reason = m_msg->getValue(YSTRING("reason"),
1831 ((s_noconn == error) ? "Could not connect to target" : (const char*)0));
1832 Message m(s_disconnected);
1833 const String* cp = m_msg->getParam(s_copyParams);
1834 if (!TelEngine::null(cp))
1835 m.copyParams(*m_msg,*cp);
1836 chan->complete(m);
1837 m.setParam("error",error);
1838 m.setParam("reason",reason);
1839 m.setParam("reroute",String::boolText(true));
1840 m.userData(chan);
1841 m.setNotify();
1842 if (!Engine::dispatch(m))
1843 chan->callRejected(error,reason,m_msg);
1844 }
1845 }
1846 }
1847 else {
1848 const char* error = m_msg->getValue(YSTRING("error"),s_noroute);
1849 chan->callRejected(error,m_msg->getValue(YSTRING("reason"),
1850 ((s_noroute == error) ? "No route to call target" : (const char*)0)),m_msg);
1851 }
1852
1853 // dereference again if the channel is dynamic
1854 if (m_driver->varchan())
1855 chan->deref();
1856 return ok;
1857 }
1858
cleanup()1859 void Router::cleanup()
1860 {
1861 destruct(m_msg);
1862 }
1863
1864
pickAccountParams(const NamedList & params)1865 void CallAccount::pickAccountParams(const NamedList& params)
1866 {
1867 NamedIterator iter(params);
1868 Lock mylock(m_mutex);
1869 m_inbParams.clearParams();
1870 m_outParams.clearParams();
1871 m_regParams.clearParams();
1872 while (const NamedString* n = iter.get()) {
1873 if (n->name().length() <= 4)
1874 continue;
1875 String name = n->name().substr(4).trimSpaces();
1876 if (n->name().startsWith("reg:"))
1877 m_regParams.setParam(name,*n);
1878 else if (n->name().startsWith("inb:"))
1879 m_inbParams.setParam(name,*n);
1880 else if (n->name().startsWith("out:"))
1881 m_outParams.setParam(name,*n);
1882 }
1883 }
1884
setInboundParams(NamedList & params)1885 void CallAccount::setInboundParams(NamedList& params)
1886 {
1887 Lock mylock(m_mutex);
1888 NamedIterator iter(m_inbParams);
1889 while (const NamedString* n = iter.get()) {
1890 String tmp(*n);
1891 params.replaceParams(tmp);
1892 params.setParam(n->name(),tmp);
1893 }
1894 }
1895
setOutboundParams(NamedList & params)1896 void CallAccount::setOutboundParams(NamedList& params)
1897 {
1898 Lock mylock(m_mutex);
1899 NamedIterator iter(m_outParams);
1900 while (const NamedString* n = iter.get()) {
1901 String tmp(*n);
1902 params.replaceParams(tmp);
1903 params.setParam(n->name(),tmp);
1904 }
1905 }
1906
setRegisterParams(NamedList & params)1907 void CallAccount::setRegisterParams(NamedList& params)
1908 {
1909 Lock mylock(m_mutex);
1910 NamedIterator iter(m_regParams);
1911 while (const NamedString* n = iter.get()) {
1912 String tmp(*n);
1913 params.replaceParams(tmp);
1914 params.setParam(n->name(),tmp);
1915 }
1916 }
1917
1918 /* vi: set ts=8 sw=4 sts=4 noet: */
1919