1 /**
2 * jbengine.cpp
3 * Yet Another Jabber Component Protocol Stack
4 * This file is part of the YATE Project http://YATE.null.ro
5 *
6 * Yet Another Telephony Engine - a fully featured software PBX and IVR
7 * Copyright (C) 2004-2014 Null Team
8 *
9 * This software is distributed under multiple licenses;
10 * see the COPYING file in the main directory for licensing
11 * information for this specific distribution.
12 *
13 * This use of this software may be subject to additional restrictions.
14 * See the LEGAL file in the main directory for details.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19 */
20
21 #include <yatejabber.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24
25 using namespace TelEngine;
26
27
fixValue(const NamedList & p,const char * param,unsigned int defVal,unsigned int min,unsigned int max,bool zero=false)28 static unsigned int fixValue(const NamedList& p, const char* param,
29 unsigned int defVal, unsigned int min, unsigned int max, bool zero = false)
30 {
31 unsigned int val = p.getIntValue(param,defVal);
32 if (!val) {
33 if (!zero)
34 val = defVal;
35 }
36 else if (val < min)
37 val = min;
38 else if (val > max)
39 val = max;
40 return val;
41 }
42
43 const TokenDict JBEvent::s_type[] = {
44 {"Message", Message},
45 {"Presence", Presence},
46 {"Iq", Iq},
47 {"Terminated", Terminated},
48 {"Destroy", Destroy},
49 {"Start", Start},
50 {"Auth", Auth},
51 {"Bind", Bind},
52 {"Running", Running},
53 {"DbResult", DbResult},
54 {"DbVerify", DbVerify},
55 {"RegisterOk", RegisterOk},
56 {"RegisterFailed", RegisterFailed},
57 {"Unknown", Unknown},
58 {0,0}
59 };
60
61 const TokenDict JBConnect::s_statusName[] = {
62 {"Start", Start},
63 {"Address", Address},
64 {"Srv", Srv},
65 {"Domain", Domain},
66 {0,0}
67 };
68
69 // Entity caps item tag in document
70 static const String s_entityCapsItem = "item";
71 // Node values used by entity caps
72 static const String s_googleTalkNode = "http://www.google.com/xmpp/client/caps";
73 static const String s_googleMailNode = "http://mail.google.com/xmpp/client/caps";
74 static const String s_googleAndroidNode = "http://www.android.com/gtalk/client/caps";
75 static const String s_googleAndroidNode2 = "http://www.android.com/gtalk/client/caps2";
76
77 // Stream read buffer
78 #define JB_STREAMBUF 8192
79 #define JB_STREAMBUF_MIN 1024
80 // Stream restart counter
81 #define JB_RESTART_COUNT 2
82 #define JB_RESTART_COUNT_MIN 1
83 #define JB_RESTART_COUNT_MAX 10
84 #define JB_RESTART_UPDATE 15000
85 #define JB_RESTART_UPDATE_MIN 5000
86 #define JB_RESTART_UPDATE_MAX 300000
87 // Stream setup timer
88 #define JB_SETUP_INTERVAL 180000
89 #define JB_SETUP_INTERVAL_MIN 60000
90 #define JB_SETUP_INTERVAL_MAX 600000
91 // Wait stream start timer
92 #define JB_START_INTERVAL 20000
93 #define JB_START_INTERVAL_MIN 10000
94 #define JB_START_INTERVAL_MAX 60000
95 // Stream connect timer
96 #define JB_CONNECT_INTERVAL 60000
97 #define JB_CONNECT_INTERVAL_MIN 1000
98 #define JB_CONNECT_INTERVAL_MAX 120000
99 // Stream SRV query timer
100 #define JB_SRV_INTERVAL 30000
101 #define JB_SRV_INTERVAL_MIN 10000
102 #define JB_SRV_INTERVAL_MAX 120000
103 // Ping
104 #define JB_PING_INTERVAL 600000
105 #define JB_PING_INTERVAL_MIN 60000
106 #define JB_PING_INTERVAL_MAX 3600000
107 #define JB_PING_TIMEOUT 30000
108 #define JB_PING_TIMEOUT_MIN 10000
109 #define JB_PING_TIMEOUT_MAX JB_PING_INTERVAL_MIN
110 // Idle
111 #define JB_IDLE_INTERVAL 3600000 // 1h
112 #define JB_IDLE_INTERVAL_MIN 600000 // 10min
113 #define JB_IDLE_INTERVAL_MAX 21600000 // 6h
114 // Redirect
115 #define JB_REDIRECT_COUNT 0
116 #define JB_REDIRECT_COUNT_CLIENT 2
117 #define JB_REDIRECT_MIN 0
118 #define JB_REDIRECT_MAX 10
119
120
121 /*
122 * SASL
123 */
findZero(const char * buf,unsigned int max)124 static inline unsigned int findZero(const char* buf, unsigned int max)
125 {
126 if (!buf)
127 return max + 1;
128 unsigned int pos = 0;
129 while (pos < max && buf[pos])
130 pos++;
131 return pos;
132 }
133
134 // Parse and decode a buffer containing SASL plain authentication data
135 // See RFC 4616 Section 2
136 // Format: [authzid] UTF8NUL username UTF8NUL passwd
137 // Each token must be up to 255 bytes length
splitPlainSasl(const DataBlock & buf)138 static NamedList* splitPlainSasl(const DataBlock& buf)
139 {
140 const char* d = (const char*)buf.data();
141 unsigned int len = buf.length();
142 if (!len)
143 return 0;
144 String user, pwd, authzid;
145 // Use a while to break to the end
146 bool ok = false;
147 while (true) {
148 // authzid
149 unsigned int ll = findZero(d,len);
150 if (ll && (ll > 255 || ll > len))
151 break;
152 authzid.assign(d,ll);
153 if (-1 == authzid.lenUtf8())
154 break;
155 d += ll;
156 len -= ll;
157 // Username
158 if (d[0] || len < 2)
159 break;
160 ll = findZero(++d,--len);
161 if (!(ll && ll < len && ll < 256))
162 break;
163 user.assign(d,ll);
164 if (-1 == user.lenUtf8())
165 break;
166 d += ll;
167 len -= ll;
168 // Password
169 if (d[0] || len < 2)
170 break;
171 ll = findZero(++d,--len);
172 if (ll != len || ll > 255)
173 break;
174 pwd.assign(d,ll);
175 ok = (-1 != pwd.lenUtf8());
176 break;
177 }
178 if (!ok)
179 return 0;
180 NamedList* result = new NamedList("");
181 result->addParam("username",user);
182 result->addParam("response",pwd);
183 if (authzid)
184 result->addParam("authzid",authzid);
185 return result;
186 }
187
splitDigestSasl(const String & buf)188 static NamedList* splitDigestSasl(const String& buf)
189 {
190 const char* d = buf.c_str();
191 unsigned int len = buf.length();
192 NamedList* result = 0;
193 while (len) {
194 // Find '='
195 unsigned int i = 0;
196 while (i < len && d[i] != '=')
197 i++;
198 if (!i || i >= len) {
199 Debug(DebugNote,"splitDigestSasl() unexpected end of buffer '%s'",d);
200 break;
201 }
202 // Get param name and skip over '='
203 String name(d,i);
204 i++;
205 d += i;
206 len -= i;
207 XDebug(DebugAll,"splitDigestSasl() found directive='%s' rest='%s' len=%u",
208 name.c_str(),d,len);
209 String value;
210 if (len) {
211 // Find ',', handle quoted parameters
212 if (*d == '\"') {
213 if (len < 2) {
214 Debug(DebugNote,
215 "splitDigestSasl() unexpected end of buffer '%s'",d);
216 break;
217 }
218 // Find an unescaped "
219 for (i = 1; i < len; i++) {
220 if (d[i] == '"' && d[i-1] != '\\')
221 break;
222 }
223 if (i == len) {
224 Debug(DebugNote,"splitDigestSasl() unclosed '\"' found at %u",
225 buf.length() - len);
226 break;
227 }
228 // Unescape the content
229 value.assign(d + 1,i - 1);
230 int pos = -1;
231 unsigned int start = 0;
232 bool ok = true;
233 while (-1 != (value.find('\\',start))) {
234 if (pos == 0) {
235 // No character to escape: error
236 if (value.length() == 1) {
237 Debug(DebugNote,"splitDigestSasl() 2");
238 ok = false;
239 break;
240 }
241 value = value.substr(1);
242 }
243 else if ((unsigned int)pos < value.length() - 1) {
244 if (value[pos - 1] != '"') {
245 // Escaped char
246 value = value.substr(0,pos) + value.substr(0,pos + 1);
247 start = pos + 1;
248 }
249 else if (value[pos + 1] == '"') {
250 // Escaped backslash
251 value = value.substr(0,pos - 1) + "\\" + value.substr(0,pos + 2);
252 start = pos + 1;
253 }
254 else {
255 // Error
256 Debug(DebugNote,"splitDigestSasl() 3");
257 ok = false;
258 break;
259 }
260 }
261 else {
262 // No character to escape: error
263 Debug(DebugNote,"splitDigestSasl() 4");
264 ok = false;
265 break;
266 }
267 }
268 if (!ok)
269 break;
270 // Adjust buffer and length
271 if (i < len) {
272 if (i == len - 1)
273 i++;
274 else if (d[i + 1] == ',')
275 i += 2;
276 else {
277 Debug(DebugNote,"splitDigestSasl() ',' not found at %u rest=%s",
278 buf.length() - len + i + 1,d);
279 break;
280 }
281 }
282 }
283 else {
284 // Skip until ,
285 for (i = 0; i < len && d[i] != ','; i++)
286 ;
287 if (i)
288 value.assign(d,i);
289 if (i < len)
290 i++;
291 }
292 d += i;
293 len -= i;
294 }
295 if (!result)
296 result = new NamedList("");
297 XDebug(DebugAll,"splitDigestSasl() found '%s'='%s' rest='%s' len=%u",
298 name.c_str(),value.c_str(),d,len);
299 result->addParam(name,value);
300 }
301 if (len)
302 TelEngine::destruct(result);
303 return result;
304 }
305
306 // Apend a quoted directive to a string
307 // Escape the value
appendQDirective(String & buf,const String & name,const String & value)308 static inline void appendQDirective(String& buf, const String& name,
309 const String& value)
310 {
311 if (-1 == value.find('\"') && -1 == value.find('\\')) {
312 buf.append(name + "=\"" + value + "\"",",");
313 return;
314 }
315 // Replace \ with "\" and " with \"
316 // See RFC2831 7.2
317 String tmp;
318 char c = 0;
319 char* s = (char*)value.c_str();
320 while ((c = *s++)) {
321 if (c == '\"')
322 tmp << '\\' << c;
323 else if (c == '\\')
324 tmp << "\"\\\"";
325 else
326 tmp += c;
327 }
328 buf.append(name + "=\"" + tmp + "\"",",");
329 }
330
331 // Constructor
SASL(bool plain,const char * realm)332 SASL::SASL(bool plain, const char* realm)
333 : m_plain(plain), m_params(0), m_realm(realm), m_nonceCount(0)
334 {
335 }
336
337 // Set auth params
setAuthParams(const char * user,const char * pwd)338 void SASL::setAuthParams(const char* user, const char* pwd)
339 {
340 if (TelEngine::null(user) && TelEngine::null(pwd))
341 return;
342 if (!m_params)
343 m_params = new NamedList("");
344 if (!TelEngine::null(user))
345 m_params->setParam("username",user);
346 if (!TelEngine::null(pwd))
347 m_params->setParam("password",pwd);
348 }
349
350 // Build an auth response
buildAuthRsp(String & buf,const char * digestUri)351 bool SASL::buildAuthRsp(String& buf, const char* digestUri)
352 {
353 if (!m_params)
354 return false;
355
356 // Plain. See RFC 4616 Section 2
357 // Format: [authzid] UTF8NUL username UTF8NUL passwd
358 // Each token must be up to 255 bytes length
359 if (m_plain) {
360 if (!m_params)
361 return false;
362 String* user = m_params->getParam("username");
363 String* pwd = m_params->getParam("password");
364 if (!user || user->length() > 255 || !pwd || pwd->length() > 255)
365 return false;
366 DataBlock data;
367 unsigned char nul = 0;
368 data.append(&nul,1);
369 data += *user;
370 data.append(&nul,1);
371 data += *pwd;
372 Base64 base64((void*)data.data(),data.length());
373 base64.encode(buf);
374 return true;
375 }
376
377 // Digest MD5. See RFC 2831 2.1.2.1
378 String* pwd = m_params->getParam("password");
379 if (!pwd)
380 return false;
381
382 #define SASL_ADD_QDIR(n) { \
383 NamedString* tmp = m_params->getParam(n); \
384 if (tmp) \
385 appendQDirective(buf,tmp->name(),*tmp); \
386 }
387 SASL_ADD_QDIR("username")
388 SASL_ADD_QDIR("realm")
389 SASL_ADD_QDIR("nonce")
390 MD5 md5(String((unsigned int)Random::random()));
391 m_cnonce = md5.hexDigest();
392 m_params->setParam("cnonce",m_cnonce);
393 SASL_ADD_QDIR("cnonce")
394 m_nonceCount++;
395 char tmp[9];
396 ::sprintf(tmp,"%08x",m_nonceCount);
397 m_params->setParam("nc",tmp);
398 SASL_ADD_QDIR("nc")
399 m_params->setParam("qop","auth");
400 SASL_ADD_QDIR("qop")
401 m_params->setParam("digest-uri",digestUri);
402 SASL_ADD_QDIR("digest-uri")
403 String rsp;
404 buildMD5Digest(rsp,*pwd);
405 buf << ",response=" << rsp;
406 SASL_ADD_QDIR("charset")
407 SASL_ADD_QDIR("md5-sess")
408 XDebug(DebugAll,"SASL built MD5 response %s [%p]",buf.c_str(),this);
409 #undef SASL_ADD_QDIR
410 Base64 base64((void*)buf.c_str(),buf.length());
411 buf.clear();
412 base64.encode(buf);
413 return true;
414 }
415
416 // Build an MD5 challenge from this object
417 // See RFC 2831 Section 2.1.1
buildMD5Challenge(String & buf)418 bool SASL::buildMD5Challenge(String& buf)
419 {
420 String tmp;
421 if (m_realm) {
422 if (-1 == m_realm.lenUtf8())
423 return false;
424 appendQDirective(tmp,"realm",m_realm);
425 }
426 // Re-build nonce. Increase nonce count
427 m_nonce.clear();
428 m_nonce << (int)Time::msecNow() << (int)Random::random();
429 MD5 md5(m_nonce);
430 m_nonce = md5.hexDigest();
431 m_nonceCount++;
432 tmp.append("nonce=\"" + m_nonce + "\"",",");
433 tmp << ",qop=\"auth\"";
434 tmp << ",charset=\"utf-8\"";
435 tmp << ",algorithm=\"md5-sess\"";
436 // RFC 2831 2.1.1: The size of a digest-challenge MUST be less than 2048 bytes
437 if (tmp.length() < 2048) {
438 buf = tmp;
439 return true;
440 }
441 m_nonceCount--;
442 return false;
443 }
444
parsePlain(const DataBlock & buf)445 bool SASL::parsePlain(const DataBlock& buf)
446 {
447 #ifdef XDEBUG
448 String tmp;
449 tmp.hexify((void*)buf.data(),buf.length(),' ');
450 Debug(DebugAll,"SASL::parsePlain() %s [%p]",tmp.c_str(),this);
451 #endif
452 TelEngine::destruct(m_params);
453 m_params = splitPlainSasl(buf);
454 return m_params != 0;
455 }
456
457 // Parse and decode a buffer containing a SASL Digest MD5 challenge
parseMD5Challenge(const String & buf)458 bool SASL::parseMD5Challenge(const String& buf)
459 {
460 XDebug(DebugAll,"SASL::parseMD5Challenge() %s [%p]",buf.c_str(),this);
461 TelEngine::destruct(m_params);
462 // RFC 2831 2.1.1: The size of a digest-response MUST be less than 2048 bytes
463 if (buf.length() >= 2048) {
464 Debug(DebugNote,"SASL::parseMD5Challenge() invalid length=%u (max=2048) [%p]",
465 buf.length(),this);
466 return false;
467 }
468 m_params = splitDigestSasl(buf);
469 if (!m_params) {
470 Debug(DebugNote,"SASL::parseMD5Challenge() failed to split params [%p]",
471 this);
472 return false;
473 }
474 return true;
475 }
476
477 // Parse and decode a buffer containing a SASL Digest MD5 response
478 // See RFC 2831
parseMD5ChallengeRsp(const String & buf)479 bool SASL::parseMD5ChallengeRsp(const String& buf)
480 {
481 XDebug(DebugAll,"SASL::parseMD5ChallengeRsp() %s [%p]",buf.c_str(),this);
482 TelEngine::destruct(m_params);
483 // RFC 2831 2.1.2: The size of a digest-response MUST be less than 4096 bytes
484 if (buf.length() >= 4096) {
485 Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid length=%u (max=4096) [%p]",
486 buf.length(),this);
487 return false;
488 }
489 m_params = splitDigestSasl(buf);
490 if (!m_params) {
491 Debug(DebugNote,"SASL::parseMD5ChallengeRsp() failed to split params [%p]",
492 this);
493 return false;
494 }
495 bool ok = false;
496 // Check realm, nonce, nonce count
497 // Use a while to break to the end
498 while (true) {
499 String* tmp = m_params->getParam("realm");
500 if (!tmp || *tmp != m_realm) {
501 Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid realm='%s' [%p]",
502 TelEngine::c_safe(tmp),this);
503 break;
504 }
505 tmp = m_params->getParam("nonce");
506 if (!tmp || *tmp != m_nonce) {
507 Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid nonce='%s' [%p]",
508 TelEngine::c_safe(tmp),this);
509 break;
510 }
511 tmp = m_params->getParam("nc");
512 if (!tmp || (unsigned int)tmp->toInteger(0,16) != m_nonceCount) {
513 Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid nonce count='%s' [%p]",
514 TelEngine::c_safe(tmp),this);
515 break;
516 }
517 ok = true;
518 break;
519 }
520 if (ok)
521 return true;
522 TelEngine::destruct(m_params);
523 return false;
524 }
525
526 // Build a Digest MD5 SASL to be sent with authentication responses
527 // See RFC 2831 2.1.2.1
buildMD5Digest(String & dest,const NamedList & params,const char * password,bool challengeRsp)528 void SASL::buildMD5Digest(String& dest, const NamedList& params,
529 const char* password, bool challengeRsp)
530 {
531 const char* nonce = params.getValue("nonce");
532 const char* cnonce = params.getValue("cnonce");
533 String qop = params.getValue("qop","auth");
534 MD5 md5;
535 md5 << params.getValue("username") << ":" << params.getValue("realm");
536 md5 << ":" << password;
537 MD5 md5A1(md5.rawDigest(),16);
538 md5A1 << ":" << nonce << ":" << cnonce;
539 const char* authzid = params.getValue("authzid");
540 if (authzid)
541 md5A1 << ":" << authzid;
542 MD5 md5A2;
543 if (challengeRsp)
544 md5A2 << "AUTHENTICATE";
545 md5A2 << ":" << params.getValue("digest-uri");
546 if (qop != "auth")
547 md5A2 << ":" << String('0',32);
548 MD5 md5Rsp;
549 md5Rsp << md5A1.hexDigest();
550 md5Rsp << ":" << nonce << ":" << params.getValue("nc");
551 md5Rsp << ":" << cnonce << ":" << qop << ":" << md5A2.hexDigest();
552 dest = md5Rsp.hexDigest();
553 }
554
555
556 /*
557 * JBConnect
558 */
559 // Constructor. Add itself to the stream's engine
JBConnect(const JBStream & stream)560 JBConnect::JBConnect(const JBStream& stream)
561 : m_status(Start), m_domain(stream.serverHost()), m_port(0),
562 m_engine(stream.engine()), m_stream(stream.toString()),
563 m_streamType((JBStream::Type)stream.type())
564 {
565 bool redir = false;
566 stream.connectAddr(m_address,m_port,m_localIp,m_status,m_srvs,&redir);
567 if (redir && m_address) {
568 char c = m_address[0];
569 if ((c < '0' || c > '9') && c != '[' && m_address[m_address.length() - 1] != ']') {
570 // Redirect to domain: replace stream domain, clear address
571 m_domain = m_address;
572 m_address.clear();
573 }
574 else {
575 // Redirect to IP address: clear stream domain
576 m_domain.clear();
577 }
578 }
579 if (m_engine)
580 m_engine->connectStatus(this,true);
581 }
582
583 // Remove itself from engine
~JBConnect()584 JBConnect::~JBConnect()
585 {
586 terminated(0,true);
587 }
588
589 // Stop the thread
stopConnect()590 void JBConnect::stopConnect()
591 {
592 Debug(m_engine,DebugStub,"JBConnect::stopConnect() not implemented!");
593 }
594
595 // Retrieve the stream name
toString() const596 const String& JBConnect::toString() const
597 {
598 return m_stream;
599 }
600
601 // Connect the socket.
connect()602 void JBConnect::connect()
603 {
604 if (!m_engine)
605 return;
606 Debug(m_engine,DebugAll,"JBConnect(%s) starting stat=%s [%p]",
607 m_stream.c_str(),lookup(m_status,s_statusName),this);
608 int port = m_port;
609 if (!port) {
610 if (m_streamType == JBStream::c2s)
611 port = XMPP_C2S_PORT;
612 else if (m_streamType == JBStream::s2s)
613 port = XMPP_S2S_PORT;
614 else {
615 Debug(m_engine,DebugNote,"JBConnect(%s) no port for %s stream [%p]",
616 m_stream.c_str(),lookup(m_streamType,JBStream::s_typeName),this);
617 return;
618 }
619 }
620 Socket* sock = 0;
621 bool stop = false;
622 advanceStatus();
623 // Try to use ip/port
624 if (m_status == Address) {
625 if (m_address && port) {
626 sock = connect(m_address,port,stop);
627 if (sock || stop || exiting(sock)) {
628 terminated(sock,false);
629 return;
630 }
631 }
632 advanceStatus();
633 }
634 if (m_status == Srv && m_domain) {
635 if (!m_srvs.skipNull()) {
636 // Get SRV records from remote party
637 String query;
638 if (m_streamType == JBStream::c2s)
639 query = "_xmpp-client._tcp.";
640 else
641 query = "_xmpp-server._tcp.";
642 query << m_domain;
643 String error;
644 // Start connecting timeout
645 if (!notifyConnecting(true,true))
646 return;
647 int code = 0;
648 if (Resolver::init())
649 code = Resolver::srvQuery(query,m_srvs,&error);
650 // Stop the timeout if not exiting
651 if (exiting(sock) || !notifyConnecting(false,true)) {
652 terminated(0,false);
653 return;
654 }
655 if (!code)
656 DDebug(m_engine,DebugAll,"JBConnect(%s) SRV query for '%s' got %u records [%p]",
657 m_stream.c_str(),query.c_str(),m_srvs.count(),this);
658 else
659 Debug(m_engine,DebugNote,"JBConnect(%s) SRV query for '%s' failed: %d '%s' [%p]",
660 m_stream.c_str(),query.c_str(),code,error.c_str(),this);
661 }
662 else
663 // Remove the first entry: we already used it
664 m_srvs.remove();
665 ObjList* o = 0;
666 while (0 != (o = m_srvs.skipNull())) {
667 SrvRecord* rec = static_cast<SrvRecord*>(o->get());
668 sock = connect(rec->address(),rec->port(),stop);
669 o->remove();
670 if (sock || stop || exiting(sock)) {
671 terminated(sock,false);
672 return;
673 }
674 }
675 advanceStatus();
676 }
677 else if (m_status == Srv)
678 advanceStatus();
679 if (m_status == Domain) {
680 // Try to resolve the domain
681 if (port && m_domain)
682 sock = connect(m_domain,port,stop);
683 advanceStatus();
684 }
685 terminated(sock,false);
686 }
687
688 // Create and try to connect a socket. Return it on success
689 // Set stop on fatal failure and return 0
connect(const char * addr,int port,bool & stop)690 Socket* JBConnect::connect(const char* addr, int port, bool& stop)
691 {
692 Socket* sock = new Socket(PF_INET,SOCK_STREAM);
693 // Bind to local ip
694 if (m_localIp) {
695 SocketAddr lip(PF_INET);
696 lip.host(m_localIp);
697 bool ok = false;
698 if (lip.host()) {
699 ok = sock->bind(lip);
700 if (!ok) {
701 String tmp;
702 Thread::errorString(tmp,sock->error());
703 Debug(m_engine,DebugNote,
704 "JBConnect(%s) failed to bind to '%s' (%s). %d '%s' [%p]",
705 m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),
706 sock->error(),tmp.c_str(),this);
707 }
708 }
709 else
710 Debug(m_engine,DebugNote,"JBConnect(%s) invalid local ip '%s' [%p]",
711 m_stream.c_str(),m_localIp.c_str(),this);
712 stop = !ok || exiting(sock);
713 if (stop) {
714 deleteSocket(sock);
715 return 0;
716 }
717 DDebug(m_engine,DebugAll,"JBConnect(%s) bound to '%s' (%s) [%p]",
718 m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),this);
719 }
720 // Use async connect
721 u_int64_t tout = 0;
722 if (m_engine)
723 tout = m_engine->m_connectTimeout * 1000;
724 if (tout && !(sock->canSelect() && sock->setBlocking(false))) {
725 tout = 0;
726 if (sock->canSelect()) {
727 String tmp;
728 Thread::errorString(tmp,sock->error());
729 Debug(m_engine,DebugInfo,
730 "JBConnect(%s) using sync connect (async set failed). %d '%s' [%p]",
731 m_stream.c_str(),sock->error(),tmp.c_str(),this);
732 }
733 else
734 Debug(m_engine,DebugInfo,
735 "JBConnect(%s) using sync connect (select() not available) [%p]",
736 m_stream.c_str(),this);
737 }
738 if (!notifyConnecting(tout == 0)) {
739 stop = true;
740 deleteSocket(sock);
741 return 0;
742 }
743 u_int64_t start = tout ? Time::now() : 0;
744 SocketAddr a(PF_INET);
745 a.host(addr);
746 a.port(port);
747 // Check exiting: it may take some time to resolve the domain
748 stop = exiting(sock);
749 if (stop)
750 return 0;
751 if (!a.host()) {
752 Debug(m_engine,DebugNote,"JBConnect(%s) failed to resolve '%s' [%p]",
753 m_stream.c_str(),addr,this);
754 deleteSocket(sock);
755 return 0;
756 }
757 unsigned int intervals = 0;
758 if (start) {
759 start = Time::now() - start;
760 if (tout > start)
761 intervals = (unsigned int)(tout - start) / Thread::idleUsec();
762 // Make sure we wait for at least 1 timeout interval
763 if (!intervals)
764 intervals = 1;
765 }
766 String domain;
767 if (a.host() != addr)
768 domain << " (" << addr << ")";
769 Debug(m_engine,DebugAll,"JBConnect(%s) attempt to connect to '%s:%d'%s [%p]",
770 m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
771 bool ok = (0 != sock->connect(a));
772 bool timeout = false;
773 // Async connect in progress
774 if (!ok && sock->inProgress()) {
775 bool done = false;
776 bool event = false;
777 while (intervals && !(done || event || stop)) {
778 if (!sock->select(0,&done,&event,Thread::idleUsec()))
779 break;
780 intervals--;
781 stop = exiting(sock);
782 }
783 timeout = !intervals && !(done || event);
784 if (sock && !sock->error() && (done || event) && sock->updateError())
785 ok = !sock->error();
786 }
787 if (ok) {
788 Debug(m_engine,DebugAll,"JBConnect(%s) connected to '%s:%d'%s [%p]",
789 m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
790 return sock;
791 }
792 if (sock) {
793 String reason;
794 if (timeout)
795 reason = "Timeout";
796 else {
797 String tmp;
798 Thread::errorString(tmp,sock->error());
799 reason << sock->error() << " '" << tmp << "'";
800 }
801 Debug(m_engine,DebugNote,"JBConnect(%s) failed to connect to '%s:%d'%s. %s [%p]",
802 m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),reason.safe(),this);
803 deleteSocket(sock);
804 }
805 return 0;
806 }
807
808 // Check if exiting. Release socket
exiting(Socket * & sock)809 bool JBConnect::exiting(Socket*& sock)
810 {
811 bool done = Thread::check(false) || !m_engine || m_engine->exiting();
812 if (done && sock)
813 deleteSocket(sock);
814 return done;
815 }
816
817 // Notify termination, remove from engine
terminated(Socket * sock,bool final)818 void JBConnect::terminated(Socket* sock, bool final)
819 {
820 bool done = exiting(sock);
821 JBEngine* engine = m_engine;
822 m_engine = 0;
823 // Remove from engine
824 if (engine)
825 engine->connectStatus(this,false);
826 if (done) {
827 if (!final && Thread::check(false))
828 Debug(m_engine,DebugAll,"JBConnect(%s) cancelled [%p]",m_stream.c_str(),this);
829 return;
830 }
831 JBStream* stream = engine->findStream(m_stream,m_streamType);
832 if (!final)
833 Debug(engine,DebugAll,"JBConnect(%s) terminated [%p]",m_stream.c_str(),this);
834 else if (stream)
835 Debug(engine,DebugWarn,"JBConnect(%s) abnormally terminated! [%p]",
836 m_stream.c_str(),this);
837 // Notify stream
838 if (stream) {
839 stream->connectTerminated(sock);
840 TelEngine::destruct(stream);
841 }
842 else {
843 deleteSocket(sock);
844 DDebug(engine,DebugInfo,"JBConnect(%s) stream vanished while connecting [%p]",
845 m_stream.c_str(),this);
846 }
847 }
848
849 // Notify connecting to the stream. Return false if stream vanished
notifyConnecting(bool sync,bool useCurrentStat)850 bool JBConnect::notifyConnecting(bool sync, bool useCurrentStat)
851 {
852 JBStream* stream = m_engine ? m_engine->findStream(m_stream,m_streamType) : 0;
853 if (!stream)
854 return false;
855 int stat = m_status;
856 if (!useCurrentStat) {
857 // Advertised state:
858 // Srv --> Address: we'll advance the state on retry
859 // Domain --> Start to re-start on retry
860 if (m_status == Srv)
861 stat = Address;
862 else if (m_status == Domain)
863 stat = Start;
864 }
865 bool ok = stream->connecting(sync,stat,m_srvs);
866 TelEngine::destruct(stream);
867 return ok;
868 }
869
870 // Delete a socket
deleteSocket(Socket * & sock)871 void JBConnect::deleteSocket(Socket*& sock)
872 {
873 if (!sock)
874 return;
875 sock->setReuse();
876 sock->setLinger(0);
877 delete sock;
878 sock = 0;
879 }
880
881 // Advance the status
advanceStatus()882 void JBConnect::advanceStatus()
883 {
884 if (m_status == Start)
885 m_status = Address;
886 else if (m_status == Address) {
887 if (m_domain) {
888 if (!m_port &&
889 (m_streamType == JBStream::c2s || m_streamType == JBStream::s2s))
890 m_status = Srv;
891 else
892 m_status = Domain;
893 }
894 else
895 m_status = Start;
896 }
897 else if (m_status == Srv)
898 m_status = Domain;
899 else if (m_status == Domain)
900 m_status = Start;
901 else
902 m_status = Address;
903 }
904
905
906 /*
907 * JBEngine
908 */
JBEngine(const char * name)909 JBEngine::JBEngine(const char* name)
910 : Mutex(true,"JBEngine"),
911 m_exiting(false),
912 m_restartMax(JB_RESTART_COUNT), m_restartUpdInterval(JB_RESTART_UPDATE),
913 m_setupTimeout(JB_SETUP_INTERVAL), m_startTimeout(JB_START_INTERVAL),
914 m_connectTimeout(JB_CONNECT_INTERVAL), m_srvTimeout(JB_SRV_INTERVAL),
915 m_pingInterval(JB_PING_INTERVAL), m_pingTimeout(JB_PING_TIMEOUT),
916 m_idleTimeout(0), m_pptTimeoutC2s(0), m_pptTimeout(0),
917 m_streamReadBuffer(JB_STREAMBUF), m_maxIncompleteXml(XMPP_MAX_INCOMPLETEXML),
918 m_redirectMax(JB_REDIRECT_COUNT),
919 m_hasClientTls(true), m_printXml(0), m_initialized(false)
920 {
921 debugName(name);
922 XDebug(this,DebugAll,"JBEngine [%p]",this);
923 }
924
~JBEngine()925 JBEngine::~JBEngine()
926 {
927 XDebug(this,DebugAll,"~JBEngine [%p]",this);
928 }
929
930 // Cleanup streams. Stop all threads owned by this engine. Release memory
destruct()931 void JBEngine::destruct()
932 {
933 cleanup(true,false);
934 GenObject::destruct();
935 }
936
937 // Initialize the engine's parameters
initialize(const NamedList & params)938 void JBEngine::initialize(const NamedList& params)
939 {
940 int lvl = params.getIntValue("debug_level",-1);
941 if (lvl != -1)
942 debugLevel(lvl);
943 JBClientEngine* client = YOBJECT(JBClientEngine,this);
944 String tmp = params.getValue("printxml");
945 if (!tmp && client)
946 tmp = "verbose";
947 m_printXml = tmp.toBoolean() ? -1: ((tmp == "verbose") ? 1 : 0);
948
949 m_streamReadBuffer = fixValue(params,"stream_readbuffer",
950 JB_STREAMBUF,JB_STREAMBUF_MIN,(unsigned int)-1);
951 m_maxIncompleteXml = fixValue(params,"stream_parsermaxbuffer",
952 XMPP_MAX_INCOMPLETEXML,1024,(unsigned int)-1);
953 m_restartMax = fixValue(params,"stream_restartcount",
954 JB_RESTART_COUNT,JB_RESTART_COUNT_MIN,JB_RESTART_COUNT_MAX);
955 m_restartUpdInterval = fixValue(params,"stream_restartupdateinterval",
956 JB_RESTART_UPDATE,JB_RESTART_UPDATE_MIN,JB_RESTART_UPDATE_MAX);
957 m_setupTimeout = fixValue(params,"stream_setuptimeout",
958 JB_SETUP_INTERVAL,JB_SETUP_INTERVAL_MIN,JB_SETUP_INTERVAL_MAX);
959 m_startTimeout = fixValue(params,"stream_starttimeout",
960 JB_START_INTERVAL,JB_START_INTERVAL_MIN,JB_START_INTERVAL_MAX);
961 m_connectTimeout = fixValue(params,"stream_connecttimeout",
962 JB_CONNECT_INTERVAL,JB_CONNECT_INTERVAL_MIN,JB_CONNECT_INTERVAL_MAX);
963 m_srvTimeout = fixValue(params,"stream_srvtimeout",
964 JB_SRV_INTERVAL,JB_SRV_INTERVAL_MIN,JB_SRV_INTERVAL_MAX);
965 m_pingInterval = fixValue(params,"stream_pinginterval",
966 client ? JB_PING_INTERVAL : 0,JB_PING_INTERVAL_MIN,JB_PING_INTERVAL_MAX,true);
967 m_pingTimeout = fixValue(params,"stream_pingtimeout",
968 client ? JB_PING_TIMEOUT : 0,JB_PING_TIMEOUT_MIN,JB_PING_TIMEOUT_MAX,true);
969 if (!(m_pingInterval && m_pingTimeout))
970 m_pingInterval = m_pingTimeout = 0;
971 m_idleTimeout = fixValue(params,"stream_idletimeout",
972 JB_IDLE_INTERVAL,JB_IDLE_INTERVAL_MIN,JB_IDLE_INTERVAL_MAX);
973 int defVal = JB_REDIRECT_COUNT;
974 if (client)
975 defVal = JB_REDIRECT_COUNT_CLIENT;
976 m_redirectMax = params.getIntValue("stream_redirectcount",
977 defVal,JB_REDIRECT_MIN,JB_REDIRECT_MAX);
978 m_pptTimeoutC2s = params.getIntValue("stream_ppttimeout_c2s",10000,0,120000);
979 m_pptTimeout = params.getIntValue("stream_ppttimeout",60000,0,180000);
980 m_initialized = true;
981 }
982
983 // Terminate all streams
cleanup(bool final,bool waitTerminate)984 void JBEngine::cleanup(bool final, bool waitTerminate)
985 {
986 DDebug(this,DebugAll,"JBEngine::cleanup() final=%s wait=%s",
987 String::boolText(final),String::boolText(waitTerminate));
988 dropAll(JBStream::TypeCount,JabberID::empty(),JabberID::empty(),
989 XMPPError::Shutdown);
990 lock();
991 ObjList* found = m_connect.skipNull();
992 if (found) {
993 Debug(this,DebugAll,"Terminating %u stream connect threads",m_connect.count());
994 for (ObjList* o = found; o; o = o->skipNext()) {
995 JBConnect* conn = static_cast<JBConnect*>(o->get());
996 XDebug(this,DebugAll,"Terminating connect thread (%p)",conn);
997 conn->stopConnect();
998 }
999 }
1000 unlock();
1001 if (found) {
1002 XDebug(this,DebugAll,"Waiting for stream connect threads to terminate");
1003 while (found) {
1004 Thread::yield(false);
1005 Lock lock(this);
1006 found = m_connect.skipNull();
1007 }
1008 Debug(this,DebugAll,"Stream connect threads terminated");
1009 }
1010 stopStreamSets(waitTerminate);
1011 }
1012
1013 // Accept an incoming stream connection. Build a stream
acceptConn(Socket * sock,SocketAddr & remote,JBStream::Type t,bool ssl)1014 bool JBEngine::acceptConn(Socket* sock, SocketAddr& remote, JBStream::Type t, bool ssl)
1015 {
1016 if (!sock)
1017 return false;
1018 if (exiting()) {
1019 Debug(this,DebugNote,
1020 "Can't accept connection from '%s:%d' type='%s': engine is exiting",
1021 remote.host().c_str(),remote.port(),lookup(t,JBStream::s_typeName));
1022 return false;
1023 }
1024 if (ssl && t != JBStream::c2s) {
1025 Debug(this,DebugNote,"SSL connection on non c2s stream");
1026 return false;
1027 }
1028 JBStream* s = 0;
1029 if (t == JBStream::c2s)
1030 s = new JBClientStream(this,sock,ssl);
1031 else if (t == JBStream::s2s)
1032 s = new JBServerStream(this,sock,false);
1033 else if (t == JBStream::comp)
1034 s = new JBServerStream(this,sock,true);
1035 else if (t == JBStream::cluster)
1036 s = new JBClusterStream(this,sock);
1037 if (s)
1038 addStream(s);
1039 else
1040 Debug(this,DebugNote,"Can't accept connection from '%s:%d' type='%s'",
1041 remote.host().c_str(),remote.port(),lookup(t,JBStream::s_typeName));
1042 return s != 0;
1043 }
1044
1045 // Find a stream by its name
findStream(const String & id,JBStream::Type hint)1046 JBStream* JBEngine::findStream(const String& id, JBStream::Type hint)
1047 {
1048 if (!id)
1049 return 0;
1050 RefPointer<JBStreamSetList> list[JBStream::TypeCount];
1051 getStreamLists(list,hint);
1052 for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
1053 if (!list[i])
1054 continue;
1055 JBStream* stream = JBEngine::findStream(id,list[i]);
1056 if (stream) {
1057 for (; i < JBStream::TypeCount; i++)
1058 list[i] = 0;
1059 return stream;
1060 }
1061 list[i] = 0;
1062 }
1063 return 0;
1064 }
1065
1066 // Find all c2s streams whose local or remote bare jid matches a given one
findClientStreams(bool in,const JabberID & jid,int flags)1067 ObjList* JBEngine::findClientStreams(bool in, const JabberID& jid, int flags)
1068 {
1069 if (!jid.node())
1070 return 0;
1071 RefPointer<JBStreamSetList> list;
1072 getStreamList(list,JBStream::c2s);
1073 if (!list)
1074 return 0;
1075 ObjList* result = 0;
1076 list->lock();
1077 for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1078 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1079 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1080 JBClientStream* stream = static_cast<JBClientStream*>(s->get());
1081 // Ignore destroying streams
1082 if (stream->incoming() != in || stream->state() == JBStream::Destroy)
1083 continue;
1084 Lock lock(stream);
1085 const JabberID& sid = in ? stream->remote() : stream->local();
1086 if (sid.bare() == jid.bare() && stream->flag(flags) && stream->ref()) {
1087 if (!result)
1088 result = new ObjList;
1089 result->append(stream);
1090 }
1091 }
1092 }
1093 list->unlock();
1094 list = 0;
1095 return result;
1096 }
1097
1098 // Find all c2s streams whose local or remote bare jid matches a given one and
1099 // their resource is found in the given list
findClientStreams(bool in,const JabberID & jid,const ObjList & resources,int flags)1100 ObjList* JBEngine::findClientStreams(bool in, const JabberID& jid, const ObjList& resources,
1101 int flags)
1102 {
1103 if (!jid.node())
1104 return 0;
1105 RefPointer<JBStreamSetList> list;
1106 getStreamList(list,JBStream::c2s);
1107 if (!list)
1108 return 0;
1109 ObjList* result = 0;
1110 list->lock();
1111 for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1112 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1113 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1114 JBClientStream* stream = static_cast<JBClientStream*>(s->get());
1115 // Ignore destroying streams
1116 if (stream->incoming() != in || stream->state() == JBStream::Destroy)
1117 continue;
1118 Lock lock(stream);
1119 const JabberID& sid = in ? stream->remote() : stream->local();
1120 if (sid.bare() == jid.bare() && resources.find(sid.resource()) &&
1121 stream->flag(flags) && stream->ref()) {
1122 if (!result)
1123 result = new ObjList;
1124 result->append(stream);
1125 }
1126 }
1127 }
1128 list->unlock();
1129 list = 0;
1130 return result;
1131 }
1132
1133 // Find a c2s stream by its local or remote jid
findClientStream(bool in,const JabberID & jid)1134 JBClientStream* JBEngine::findClientStream(bool in, const JabberID& jid)
1135 {
1136 if (!jid.node())
1137 return 0;
1138 RefPointer<JBStreamSetList> list;
1139 getStreamList(list,JBStream::c2s);
1140 if (!list)
1141 return 0;
1142 JBClientStream* found = 0;
1143 list->lock();
1144 for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1145 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1146 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1147 found = static_cast<JBClientStream*>(s->get());
1148 // Ignore destroying streams
1149 if (found->incoming() != in || found->state() == JBStream::Destroy)
1150 continue;
1151 Lock lock(found);
1152 const JabberID& sid = in ? found->remote() : found->local();
1153 if (sid == jid && found->ref())
1154 break;
1155 found = 0;
1156 }
1157 if (found)
1158 break;
1159 }
1160 list->unlock();
1161 list = 0;
1162 return found;
1163 }
1164
1165 // Terminate all streams matching type and/or local/remote jid
dropAll(JBStream::Type type,const JabberID & local,const JabberID & remote,XMPPError::Type error,const char * reason)1166 unsigned int JBEngine::dropAll(JBStream::Type type, const JabberID& local,
1167 const JabberID& remote, XMPPError::Type error, const char* reason)
1168 {
1169 XDebug(this,DebugInfo,"dropAll(%s,%s,%s,%s,%s)",lookup(type,JBStream::s_typeName),
1170 local.c_str(),remote.c_str(),XMPPUtils::s_error[error].c_str(),reason);
1171 RefPointer<JBStreamSetList> list[JBStream::TypeCount];
1172 getStreamLists(list,type);
1173 unsigned int n = 0;
1174 for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
1175 if (!list[i])
1176 continue;
1177 list[i]->lock();
1178 for (ObjList* o = list[i]->sets().skipNull(); o; o = o->skipNext()) {
1179 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1180 n += set->dropAll(local,remote,error,reason);
1181 }
1182 list[i]->unlock();
1183 list[i] = 0;
1184 }
1185 DDebug(this,DebugInfo,
1186 "Dropped %u streams type=%s local=%s remote=%s error=%s reason=%s",
1187 n,lookup(type,JBStream::s_typeName),local.c_str(),remote.c_str(),
1188 XMPPUtils::s_error[error].c_str(),reason);
1189 return n;
1190 }
1191
1192 // Process an event
processEvent(JBEvent * ev)1193 void JBEngine::processEvent(JBEvent* ev)
1194 {
1195 Debug(this,DebugStub,"JBEngine::processEvent() not implemented!");
1196 returnEvent(ev);
1197 }
1198
1199 // Return an event to this engine
returnEvent(JBEvent * ev,XMPPError::Type error,const char * reason)1200 void JBEngine::returnEvent(JBEvent* ev, XMPPError::Type error, const char* reason)
1201 {
1202 if (!ev)
1203 return;
1204 // Send error when supported
1205 if (error != XMPPError::NoError)
1206 ev->sendStanzaError(error,reason);
1207 XDebug(this,DebugAll,"Deleting returned event (%p,%s)",ev,ev->name());
1208 TelEngine::destruct(ev);
1209 }
1210
1211 // Start stream TLS
encryptStream(JBStream * stream)1212 void JBEngine::encryptStream(JBStream* stream)
1213 {
1214 Debug(this,DebugStub,"JBEngine::encryptStream() not implemented!");
1215 }
1216
1217 // Connect an outgoing stream
connectStream(JBStream * stream)1218 void JBEngine::connectStream(JBStream* stream)
1219 {
1220 Debug(this,DebugStub,"JBEngine::connectStream() not implemented!");
1221 }
1222
1223 // Start stream compression
compressStream(JBStream * stream,const String & formats)1224 void JBEngine::compressStream(JBStream* stream, const String& formats)
1225 {
1226 Debug(this,DebugStub,"JBEngine::compressStream() not implemented!");
1227 }
1228
1229 // Build a dialback key
buildDialbackKey(const String & id,const String & local,const String & remote,String & key)1230 void JBEngine::buildDialbackKey(const String& id, const String& local,
1231 const String& remote, String& key)
1232 {
1233 Debug(this,DebugStub,"JBEngine::buildDialbackKey() not implemented!");
1234 }
1235
1236 // Check for duplicate stream id at a remote server
checkDupId(JBStream * stream)1237 bool JBEngine::checkDupId(JBStream* stream)
1238 {
1239 if (!stream || stream->incoming())
1240 return false;
1241 RefPointer<JBStreamSetList> list;
1242 getStreamList(list,stream->type());
1243 if (!list)
1244 return false;
1245 stream->lock();
1246 String domain = stream->remote().domain();
1247 String id = stream->id();
1248 stream->unlock();
1249 list->lock();
1250 JBStream* found = 0;
1251 for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1252 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1253 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1254 found = static_cast<JBStream*>(s->get());
1255 if (found != stream && found->outgoing()) {
1256 // Lock the stream: its data might change
1257 Lock lock(found);
1258 // Ignore destroying streams
1259 if (found->remote().domain() == domain &&
1260 found->id() == id && found->state() != JBStream::Destroy)
1261 break;
1262 }
1263 found = 0;
1264 }
1265 if (found)
1266 break;
1267 }
1268 list->unlock();
1269 list = 0;
1270 return found != 0;
1271 }
1272
1273 // Print XML to output
printXml(const JBStream * stream,bool send,XmlChild & xml) const1274 void JBEngine::printXml(const JBStream* stream, bool send, XmlChild& xml) const
1275 {
1276 if (!(m_printXml && debugAt(DebugInfo)))
1277 return;
1278 String s;
1279 if (m_printXml > 0)
1280 s << "\r\n-----";
1281 XMPPUtils::print(s,xml,m_printXml > 0);
1282 if (m_printXml > 0)
1283 s << "\r\n-----";
1284 const char* dir = send ? "Sending to" : "Receiving from";
1285 if (m_printXml < 0)
1286 Debug(stream,DebugInfo,"%s '%s' %s [%p]",dir,stream->remote().c_str(),s.c_str(),stream);
1287 else
1288 Debug(stream,DebugInfo,"%s '%s' [%p]%s",dir,stream->remote().c_str(),stream,s.c_str());
1289 }
1290
1291 // Print an XML fragment to output
printXml(const JBStream * stream,bool send,XmlFragment & frag) const1292 void JBEngine::printXml(const JBStream* stream, bool send, XmlFragment& frag) const
1293 {
1294 if (!(m_printXml && debugAt(DebugInfo)))
1295 return;
1296 String s;
1297 if (m_printXml > 0)
1298 s << "\r\n-----";
1299 for (ObjList* o = frag.getChildren().skipNull(); o; o = o->skipNext())
1300 XMPPUtils::print(s,*static_cast<XmlChild*>(o->get()),m_printXml > 0);
1301 if (m_printXml > 0)
1302 s << "\r\n-----";
1303 const char* dir = send ? "Sending to" : "Receiving from";
1304 if (m_printXml < 0)
1305 Debug(stream,DebugInfo,"%s '%s' %s [%p]",dir,stream->remote().c_str(),s.c_str(),stream);
1306 else
1307 Debug(stream,DebugInfo,"%s '%s' [%p]%s",dir,stream->remote().c_str(),stream,s.c_str());
1308 }
1309
1310 // Add a stream to one of the stream lists
addStream(JBStream * stream)1311 void JBEngine::addStream(JBStream* stream)
1312 {
1313 Debug(this,DebugStub,"JBEngine::addStream() not implemented!");
1314 }
1315
1316 // Remove a stream
removeStream(JBStream * stream,bool delObj)1317 void JBEngine::removeStream(JBStream* stream, bool delObj)
1318 {
1319 if (!stream)
1320 return;
1321 stopConnect(stream->toString());
1322 }
1323
1324 // Add/remove a connect stream thread when started/stopped
connectStatus(JBConnect * conn,bool started)1325 void JBEngine::connectStatus(JBConnect* conn, bool started)
1326 {
1327 if (!conn)
1328 return;
1329 Lock lock(this);
1330 if (started) {
1331 // Make sure we remove any existing connect stream with the same name
1332 stopConnect(conn->toString());
1333 m_connect.append(conn)->setDelete(false);
1334 DDebug(this,DebugAll,"Added stream connect thread (%p)",conn);
1335 }
1336 else {
1337 GenObject* o = m_connect.remove(conn,false);
1338 if (o)
1339 DDebug(this,DebugAll,"Removed stream connect thread (%p)",conn);
1340 }
1341 }
1342
1343 // Stop a connect stream
stopConnect(const String & name)1344 void JBEngine::stopConnect(const String& name)
1345 {
1346 Lock lock(this);
1347 ObjList* o = m_connect.find(name);
1348 if (!o)
1349 return;
1350 JBConnect* conn = static_cast<JBConnect*>(o->get());
1351 Debug(this,DebugAll,"Stopping stream connect thread (%p,%s)",conn,name.c_str());
1352 conn->stopConnect();
1353 o->remove(false);
1354 }
1355
1356 // Find a stream by its name in a given set list
findStream(const String & id,JBStreamSetList * list)1357 JBStream* JBEngine::findStream(const String& id, JBStreamSetList* list)
1358 {
1359 if (!list)
1360 return 0;
1361 Lock lock(list);
1362 ObjList* found = 0;
1363 for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
1364 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1365 found = set->clients().find(id);
1366 }
1367 JBStream* stream = found ? static_cast<JBStream*>(found->get()) : 0;
1368 if (stream && !stream->ref())
1369 stream = 0;
1370 return stream;
1371 }
1372
1373
1374 /*
1375 * JBServerEngine
1376 */
JBServerEngine(const char * name)1377 JBServerEngine::JBServerEngine(const char* name)
1378 : JBEngine(name),
1379 m_streamIndex(0),
1380 m_c2sReceive(0), m_c2sProcess(0), m_s2sReceive(0), m_s2sProcess(0),
1381 m_compReceive(0), m_compProcess(0), m_clusterReceive(0), m_clusterProcess(0)
1382 {
1383 }
1384
~JBServerEngine()1385 JBServerEngine::~JBServerEngine()
1386 {
1387 }
1388
1389 // Terminate all streams
cleanup(bool final,bool waitTerminate)1390 void JBServerEngine::cleanup(bool final, bool waitTerminate)
1391 {
1392 JBEngine::cleanup(final,waitTerminate);
1393 DDebug(this,DebugAll,"JBServerEngine::cleanup() final=%s wait=%s",
1394 String::boolText(final),String::boolText(waitTerminate));
1395 if (!final)
1396 return;
1397 Lock lock(this);
1398 TelEngine::destruct(m_c2sReceive);
1399 TelEngine::destruct(m_c2sProcess);
1400 TelEngine::destruct(m_s2sReceive);
1401 TelEngine::destruct(m_s2sProcess);
1402 TelEngine::destruct(m_compReceive);
1403 TelEngine::destruct(m_compProcess);
1404 TelEngine::destruct(m_clusterReceive);
1405 TelEngine::destruct(m_clusterProcess);
1406 }
1407
1408 // Stop all stream sets
stopStreamSets(bool waitTerminate)1409 void JBServerEngine::stopStreamSets(bool waitTerminate)
1410 {
1411 XDebug(this,DebugAll,"JBServerEngine::stopStreamSets() wait=%s",
1412 String::boolText(waitTerminate));
1413 lock();
1414 RefPointer<JBStreamSetList> sets[] = {m_c2sReceive,m_c2sProcess,
1415 m_s2sReceive,m_s2sProcess,m_compReceive,m_compProcess,
1416 m_clusterReceive,m_clusterProcess};
1417 unlock();
1418 int n = 2 * JBStream::TypeCount;
1419 for (int i = 0; i < n; i++)
1420 if (sets[i])
1421 sets[i]->stop(0,waitTerminate);
1422 for (int j = 0; j < n; j++)
1423 sets[j] = 0;
1424 }
1425
1426 // Retrieve the list of streams of a given type
getStreamList(RefPointer<JBStreamSetList> & list,int type)1427 void JBServerEngine::getStreamList(RefPointer<JBStreamSetList>& list, int type)
1428 {
1429 Lock lock(this);
1430 if (type == JBStream::c2s)
1431 list = m_c2sReceive;
1432 else if (type == JBStream::s2s)
1433 list = m_s2sReceive;
1434 else if (type == JBStream::comp)
1435 list = m_compReceive;
1436 else if (type == JBStream::cluster)
1437 list = m_clusterReceive;
1438 }
1439
1440 // Retrieve the stream lists of a given type
getStreamListsType(int type,RefPointer<JBStreamSetList> & recv,RefPointer<JBStreamSetList> & process)1441 void JBServerEngine::getStreamListsType(int type, RefPointer<JBStreamSetList>& recv,
1442 RefPointer<JBStreamSetList>& process)
1443 {
1444 if (type == JBStream::c2s) {
1445 recv = m_c2sReceive;
1446 process = m_c2sProcess;
1447 }
1448 else if (type == JBStream::s2s) {
1449 recv = m_s2sReceive;
1450 process = m_s2sProcess;
1451 }
1452 else if (type == JBStream::comp) {
1453 recv = m_compReceive;
1454 process = m_compProcess;
1455 }
1456 else if (type == JBStream::cluster) {
1457 recv = m_clusterReceive;
1458 process = m_clusterProcess;
1459 }
1460 }
1461
1462 // Find a server to server or component stream by local/remote domain.
1463 // Skip over outgoing dialback streams
findServerStream(const String & local,const String & remote,bool out,bool auth)1464 JBServerStream* JBServerEngine::findServerStream(const String& local, const String& remote,
1465 bool out, bool auth)
1466 {
1467 if (!(local && remote))
1468 return 0;
1469 lock();
1470 RefPointer<JBStreamSetList> list[2] = {m_s2sReceive,m_compReceive};
1471 unlock();
1472 JBServerStream* stream = 0;
1473 for (int i = 0; i < 2; i++) {
1474 list[i]->lock();
1475 for (ObjList* o = list[i]->sets().skipNull(); o; o = o->skipNext()) {
1476 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1477 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1478 stream = static_cast<JBServerStream*>(s->get());
1479 if (stream->type() == JBStream::comp ||
1480 (out == stream->outgoing() && !stream->dialback())) {
1481 // Lock the stream: remote jid might change
1482 Lock lock(stream);
1483 if (local != stream->local()) {
1484 stream = 0;
1485 continue;
1486 }
1487 bool checkRemote = out || stream->type() == JBStream::comp;
1488 if ((checkRemote && remote == stream->remote()) ||
1489 (!checkRemote && stream->hasRemoteDomain(remote,auth))) {
1490 stream->ref();
1491 break;
1492 }
1493 }
1494 stream = 0;
1495 }
1496 if (stream)
1497 break;
1498 }
1499 list[i]->unlock();
1500 if (stream)
1501 break;
1502 }
1503 list[0] = list[1] = 0;
1504 return stream;
1505 }
1506
1507 // Create an outgoing s2s stream.
createServerStream(const String & local,const String & remote,const char * dbId,const char * dbKey,bool dbOnly,const NamedList * params)1508 JBServerStream* JBServerEngine::createServerStream(const String& local,
1509 const String& remote, const char* dbId, const char* dbKey, bool dbOnly,
1510 const NamedList* params)
1511 {
1512 if (exiting()) {
1513 Debug(this,DebugAll,"Can't create s2s local=%s remote=%s: engine is exiting",
1514 local.c_str(),remote.c_str());
1515 return 0;
1516 }
1517 JBServerStream* stream = 0;
1518 if (!dbOnly)
1519 stream = findServerStream(local,remote,true);
1520 if (!stream) {
1521 stream = new JBServerStream(this,local,remote,dbId,dbKey,dbOnly,params);
1522 stream->ref();
1523 addStream(stream);
1524 }
1525 else
1526 TelEngine::destruct(stream);
1527 return stream;
1528 }
1529
1530 // Create an outgoing comp stream
createCompStream(const String & name,const String & local,const String & remote,const NamedList * params)1531 JBServerStream* JBServerEngine::createCompStream(const String& name, const String& local,
1532 const String& remote, const NamedList* params)
1533 {
1534 if (exiting()) {
1535 Debug(this,DebugAll,"Can't create comp local=%s remote=%s: engine is exiting",
1536 local.c_str(),remote.c_str());
1537 return 0;
1538 }
1539 JBServerStream* stream = findServerStream(local,remote,true);
1540 if (!stream) {
1541 stream = new JBServerStream(this,local,remote,&name,params);
1542 stream->ref();
1543 addStream(stream);
1544 }
1545 return stream;
1546 }
1547
1548 // Find a cluster stream by remote domain
findClusterStream(const String & remote,JBClusterStream * skip)1549 JBClusterStream* JBServerEngine::findClusterStream(const String& remote,
1550 JBClusterStream* skip)
1551 {
1552 if (!remote)
1553 return 0;
1554 lock();
1555 RefPointer<JBStreamSetList> list = m_clusterReceive;
1556 unlock();
1557 JBClusterStream* stream = 0;
1558 list->lock();
1559 for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1560 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1561 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1562 stream = static_cast<JBClusterStream*>(s->get());
1563 if (skip != stream) {
1564 Lock lock(stream);
1565 if (stream->state() != JBStream::Destroy &&
1566 remote == stream->remote()) {
1567 stream->ref();
1568 break;
1569 }
1570 }
1571 stream = 0;
1572 }
1573 }
1574 list->unlock();
1575 list = 0;
1576 return stream;
1577 }
1578
1579 // Create an outgoing cluster stream
createClusterStream(const String & local,const String & remote,const NamedList * params)1580 JBClusterStream* JBServerEngine::createClusterStream(const String& local,
1581 const String& remote, const NamedList* params)
1582 {
1583 if (exiting()) {
1584 Debug(this,DebugAll,"Can't create cluster local=%s remote=%s: engine is exiting",
1585 local.c_str(),remote.c_str());
1586 return 0;
1587 }
1588 JBClusterStream* stream = findClusterStream(remote);
1589 if (!stream) {
1590 stream = new JBClusterStream(this,local,remote,params);
1591 stream->ref();
1592 addStream(stream);
1593 }
1594 return stream;
1595 }
1596
1597 // Terminate all incoming c2s streams matching a given JID
terminateClientStreams(const JabberID & jid,XMPPError::Type error,const char * reason)1598 unsigned int JBServerEngine::terminateClientStreams(const JabberID& jid,
1599 XMPPError::Type error, const char* reason)
1600 {
1601 unsigned int n = 0;
1602 ObjList* list = findClientStreams(true,jid);
1603 if (!list)
1604 return 0;
1605 n = list->count();
1606 DDebug(this,DebugInfo,"Terminating %u incoming c2s streams jid=%s error=%s reason=%s",
1607 n,jid.bare().c_str(),XMPPUtils::s_tag[error].c_str(),reason);
1608 for (ObjList* o = list->skipNull(); o; o = o->skipNext()) {
1609 JBClientStream* stream = static_cast<JBClientStream*>(o->get());
1610 stream->terminate(-1,true,0,error,reason);
1611 }
1612 TelEngine::destruct(list);
1613 return n;
1614 }
1615
1616 // Add a stream to one of the stream lists
addStream(JBStream * stream)1617 void JBServerEngine::addStream(JBStream* stream)
1618 {
1619 if (!stream)
1620 return;
1621 lock();
1622 RefPointer<JBStreamSetList> recv;
1623 RefPointer<JBStreamSetList> process;
1624 getStreamListsType(stream->type(),recv,process);
1625 unlock();
1626 if (recv && process) {
1627 recv->add(stream);
1628 process->add(stream);
1629 }
1630 else
1631 DDebug(this,DebugStub,"JBServerEngine::addStream() type='%s' not handled!",
1632 stream->typeName());
1633 recv = 0;
1634 process = 0;
1635 TelEngine::destruct(stream);
1636 }
1637
1638 // Remove a stream
removeStream(JBStream * stream,bool delObj)1639 void JBServerEngine::removeStream(JBStream* stream, bool delObj)
1640 {
1641 if (!stream)
1642 return;
1643 JBEngine::removeStream(stream,delObj);
1644 lock();
1645 DDebug(this,DebugAll,"JBServerEngine::removeStream(%p,%u) id=%s",
1646 stream,delObj,stream->toString().c_str());
1647 RefPointer<JBStreamSetList> recv;
1648 RefPointer<JBStreamSetList> process;
1649 getStreamListsType(stream->type(),recv,process);
1650 unlock();
1651 if (recv)
1652 recv->remove(stream,delObj);
1653 if (process)
1654 process->remove(stream,delObj);
1655 recv = 0;
1656 process = 0;
1657 }
1658
1659
1660 /*
1661 * JBClientEngine
1662 */
JBClientEngine(const char * name)1663 JBClientEngine::JBClientEngine(const char* name)
1664 : JBEngine(name),
1665 m_receive(0), m_process(0)
1666 {
1667 }
1668
~JBClientEngine()1669 JBClientEngine::~JBClientEngine()
1670 {
1671 }
1672
1673 // Terminate all streams
cleanup(bool final,bool waitTerminate)1674 void JBClientEngine::cleanup(bool final, bool waitTerminate)
1675 {
1676 JBEngine::cleanup(final,waitTerminate);
1677 DDebug(this,DebugAll,"JBClientEngine::cleanup() final=%s wait=%s",
1678 String::boolText(final),String::boolText(waitTerminate));
1679 if (!final)
1680 return;
1681 Lock lock(this);
1682 TelEngine::destruct(m_receive);
1683 TelEngine::destruct(m_process);
1684 }
1685
1686 // Find a stream by account
findAccount(const String & account)1687 JBClientStream* JBClientEngine::findAccount(const String& account)
1688 {
1689 if (!account)
1690 return 0;
1691 RefPointer<JBStreamSetList> list;
1692 getStreamList(list,JBStream::c2s);
1693 if (!list)
1694 return 0;
1695 JBClientStream* found = 0;
1696 list->lock();
1697 for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
1698 JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1699 for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1700 found = static_cast<JBClientStream*>(s->get());
1701 if (account == found->account())
1702 break;
1703 found = 0;
1704 }
1705 }
1706 if (found && !found->ref())
1707 found = 0;
1708 list->unlock();
1709 list = 0;
1710 return found;
1711 }
1712
1713 // Build an outgoing client stream
create(const String & account,const NamedList & params,const String & name)1714 JBClientStream* JBClientEngine::create(const String& account, const NamedList& params,
1715 const String& name)
1716 {
1717 if (!account)
1718 return 0;
1719 String serverHost;
1720 String username = params.getValue("username");
1721 String domain = params.getValue("domain");
1722 int pos = username.find("@");
1723 if (pos > 0) {
1724 serverHost = domain;
1725 domain = username.substr(pos + 1);
1726 username = username.substr(0,pos);
1727 }
1728 if (!domain)
1729 domain = params.getValue("server",params.getValue("address"));
1730 JabberID jid(username,domain,params.getValue("resource"));
1731 if (!jid.bare()) {
1732 Debug(this,DebugNote,"Can't create client stream: invalid jid=%s",jid.bare().c_str());
1733 return 0;
1734 }
1735 Lock lock(this);
1736 JBClientStream* stream = static_cast<JBClientStream*>(findAccount(account));
1737 if (!stream) {
1738 stream = new JBClientStream(this,jid,account,params,name,serverHost);
1739 stream->ref();
1740 addStream(stream);
1741 }
1742 else
1743 TelEngine::destruct(stream);
1744 return stream;
1745 }
1746
1747 // Add a stream to one of the stream lists
addStream(JBStream * stream)1748 void JBClientEngine::addStream(JBStream* stream)
1749 {
1750 if (!stream)
1751 return;
1752 lock();
1753 RefPointer<JBStreamSetList> recv = 0;
1754 RefPointer<JBStreamSetList> process = 0;
1755 if (stream->type() == JBStream::c2s) {
1756 recv = m_receive;
1757 process = m_process;
1758 }
1759 unlock();
1760 if (recv && process) {
1761 recv->add(stream);
1762 process->add(stream);
1763 }
1764 else
1765 DDebug(this,DebugStub,"JBClientEngine::addStream() type='%s' not handled!",
1766 stream->typeName());
1767 recv = 0;
1768 process = 0;
1769 TelEngine::destruct(stream);
1770 }
1771
1772 // Remove a stream
removeStream(JBStream * stream,bool delObj)1773 void JBClientEngine::removeStream(JBStream* stream, bool delObj)
1774 {
1775 if (!stream)
1776 return;
1777 JBEngine::removeStream(stream,delObj);
1778 lock();
1779 DDebug(this,DebugAll,"JBClientEngine::removeStream(%p,%u) id=%s",
1780 stream,delObj,stream->toString().c_str());
1781 RefPointer<JBStreamSetList> recv;
1782 RefPointer<JBStreamSetList> process;
1783 if (stream->type() == JBStream::c2s) {
1784 recv = m_receive;
1785 process = m_process;
1786 }
1787 unlock();
1788 if (recv)
1789 recv->remove(stream,delObj);
1790 if (process)
1791 process->remove(stream,delObj);
1792 recv = 0;
1793 process = 0;
1794 }
1795
1796 // Stop all stream sets
stopStreamSets(bool waitTerminate)1797 void JBClientEngine::stopStreamSets(bool waitTerminate)
1798 {
1799 XDebug(this,DebugAll,"JBClientEngine::stopStreamSets() wait=%s",
1800 String::boolText(waitTerminate));
1801 lock();
1802 RefPointer<JBStreamSetList> receive = m_receive;
1803 RefPointer<JBStreamSetList> process = m_process;
1804 unlock();
1805 if (receive)
1806 receive->stop(0,waitTerminate);
1807 if (process)
1808 process->stop(0,waitTerminate);
1809 receive = 0;
1810 process = 0;
1811 }
1812
1813 // Retrieve the list of streams of a given type
getStreamList(RefPointer<JBStreamSetList> & list,int type)1814 void JBClientEngine::getStreamList(RefPointer<JBStreamSetList>& list, int type)
1815 {
1816 if (type != JBStream::c2s)
1817 return;
1818 Lock lock(this);
1819 list = m_receive;
1820 }
1821
1822
1823 /*
1824 * JBEvent
1825 */
~JBEvent()1826 JBEvent::~JBEvent()
1827 {
1828 releaseStream(true);
1829 releaseXml(true);
1830 XDebug(DebugAll,"JBEvent::~JBEvent [%p]",this);
1831 }
1832
1833 // Get a client stream from the event's stream
clientStream()1834 JBClientStream* JBEvent::clientStream()
1835 {
1836 return m_stream ? m_stream->clientStream() : 0;
1837 }
1838
1839 // Get a server stream from the event's stream
serverStream()1840 JBServerStream* JBEvent::serverStream()
1841 {
1842 return m_stream ? m_stream->serverStream() : 0;
1843 }
1844
clusterStream()1845 JBClusterStream* JBEvent::clusterStream()
1846 {
1847 return m_stream ? m_stream->clusterStream() : 0;
1848 }
1849
1850 // Delete the underlying XmlElement(s). Release the ownership.
releaseXml(bool del)1851 XmlElement* JBEvent::releaseXml(bool del)
1852 {
1853 m_child = 0;
1854 if (del) {
1855 TelEngine::destruct(m_element);
1856 return 0;
1857 }
1858 XmlElement* tmp = m_element;
1859 m_element = 0;
1860 return tmp;
1861 }
1862
releaseStream(bool release)1863 void JBEvent::releaseStream(bool release)
1864 {
1865 if (m_link && m_stream) {
1866 m_stream->eventTerminated(this);
1867 m_link = false;
1868 }
1869 if (release)
1870 TelEngine::destruct(m_stream);
1871 }
1872
1873 // Build an 'iq' result stanza from event data
buildIqResult(bool addTags,XmlElement * child)1874 XmlElement* JBEvent::buildIqResult(bool addTags, XmlElement* child)
1875 {
1876 XmlElement* xml = 0;
1877 if (addTags)
1878 xml = XMPPUtils::createIqResult(m_to,m_from,m_id,child);
1879 else
1880 xml = XMPPUtils::createIqResult(0,0,m_id,child);
1881 return xml;
1882 }
1883
1884 // Build and send a stanza 'result' from enclosed 'iq' element
sendIqResult(XmlElement * child)1885 bool JBEvent::sendIqResult(XmlElement* child)
1886 {
1887 if (!(m_element && m_stream && XMPPUtils::isUnprefTag(*m_element,XmlTag::Iq))) {
1888 TelEngine::destruct(child);
1889 return false;
1890 }
1891 if (m_stanzaType == "error" || m_stanzaType == "result") {
1892 TelEngine::destruct(child);
1893 return false;
1894 }
1895 XmlElement* xml = buildIqResult(true,child);
1896 bool ok = m_stream->state() == JBStream::Running ?
1897 m_stream->sendStanza(xml) : m_stream->sendStreamXml(m_stream->state(),xml);
1898 if (ok) {
1899 releaseXml(true);
1900 return true;
1901 }
1902 return false;
1903 }
1904
1905 // Build an 'iq' error stanza from event data
buildIqError(bool addTags,XMPPError::Type error,const char * reason,XMPPError::ErrorType type)1906 XmlElement* JBEvent::buildIqError(bool addTags, XMPPError::Type error, const char* reason,
1907 XMPPError::ErrorType type)
1908 {
1909 XmlElement* xml = 0;
1910 if (addTags)
1911 xml = XMPPUtils::createIq(XMPPUtils::IqError,m_to,m_from,m_id);
1912 else
1913 xml = XMPPUtils::createIq(XMPPUtils::IqError,0,0,m_id);
1914 if (!m_id)
1915 xml->addChild(releaseXml());
1916 xml->addChild(XMPPUtils::createError(type,error,reason));
1917 return xml;
1918 }
1919
1920 // Build and send a stanza error from enclosed element
sendStanzaError(XMPPError::Type error,const char * reason,XMPPError::ErrorType type)1921 bool JBEvent::sendStanzaError(XMPPError::Type error, const char* reason,
1922 XMPPError::ErrorType type)
1923 {
1924 if (!(m_element && m_stream && XMPPUtils::isStanza(*m_element)))
1925 return false;
1926 if (m_stanzaType == "error" || m_stanzaType == "result")
1927 return false;
1928 XmlElement* xml = new XmlElement(m_element->toString());
1929 xml->setAttributeValid("from",m_to);
1930 xml->setAttributeValid("to",m_from);
1931 xml->setAttributeValid("id",m_id);
1932 xml->setAttribute("type","error");
1933 xml->addChild(XMPPUtils::createError(type,error,reason));
1934 bool ok = m_stream->state() == JBStream::Running ?
1935 m_stream->sendStanza(xml) : m_stream->sendStreamXml(m_stream->state(),xml);
1936 if (ok) {
1937 releaseXml(true);
1938 return true;
1939 }
1940 return false;
1941 }
1942
init(JBStream * stream,XmlElement * element,const JabberID * from,const JabberID * to)1943 bool JBEvent::init(JBStream* stream, XmlElement* element,
1944 const JabberID* from, const JabberID* to)
1945 {
1946 bool bRet = true;
1947 if (stream && stream->ref())
1948 m_stream = stream;
1949 else
1950 bRet = false;
1951 m_element = element;
1952 if (from)
1953 m_from = *from;
1954 if (to)
1955 m_to = *to;
1956 XDebug(DebugAll,"JBEvent::init type=%s stream=(%p) xml=(%p) [%p]",
1957 name(),m_stream,m_element,this);
1958 if (!m_element)
1959 return bRet;
1960
1961 // Most elements have these parameters:
1962 m_stanzaType = m_element->getAttribute("type");
1963 if (!from)
1964 m_from = m_element->getAttribute("from");
1965 if (!to)
1966 m_to = m_element->getAttribute("to");
1967 m_id = m_element->getAttribute("id");
1968
1969 // Decode some data
1970 int t = XMPPUtils::tag(*m_element);
1971 switch (t) {
1972 case XmlTag::Message:
1973 if (m_stanzaType != "error")
1974 m_text = XMPPUtils::body(*m_element);
1975 else
1976 XMPPUtils::decodeError(m_element,m_text,m_text);
1977 break;
1978 case XmlTag::Iq:
1979 case XmlTag::Presence:
1980 if (m_stanzaType != "error")
1981 break;
1982 default:
1983 XMPPUtils::decodeError(m_element,m_text,m_text);
1984 }
1985 return bRet;
1986 }
1987
1988
1989 /*
1990 * JBStreamSet
1991 */
1992 // Constructor
JBStreamSet(JBStreamSetList * owner)1993 JBStreamSet::JBStreamSet(JBStreamSetList* owner)
1994 : Mutex(true,"JBStreamSet"),
1995 m_changed(false), m_exiting(false), m_owner(owner)
1996 {
1997 XDebug(m_owner->engine(),DebugAll,"JBStreamSet::JBStreamSet(%s) [%p]",
1998 m_owner->toString().c_str(),this);
1999 }
2000
2001 // Remove from owner
~JBStreamSet()2002 JBStreamSet::~JBStreamSet()
2003 {
2004 if (m_clients.skipNull())
2005 Debug(m_owner->engine(),DebugCrit,
2006 "JBStreamSet(%s) destroyed while owning %u streams [%p]",
2007 m_owner->toString().c_str(),m_clients.count(),this);
2008 m_owner->remove(this);
2009 XDebug(m_owner->engine(),DebugAll,"JBStreamSet::~JBStreamSet(%s) [%p]",
2010 m_owner->toString().c_str(),this);
2011 }
2012
2013 // Add a stream to the set. The stream's reference counter will be increased
add(JBStream * client)2014 bool JBStreamSet::add(JBStream* client)
2015 {
2016 if (!client)
2017 return false;
2018 Lock lock(this);
2019 if (m_exiting || (m_owner->maxStreams() && m_clients.count() >= m_owner->maxStreams()) ||
2020 !client->ref())
2021 return false;
2022 m_clients.append(client);
2023 m_changed = true;
2024 DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) added (%p,'%s') type=%s [%p]",
2025 m_owner->toString().c_str(),client,client->name(),client->typeName(),this);
2026 return true;
2027 }
2028
2029 // Remove a stream from set
remove(JBStream * client,bool delObj)2030 bool JBStreamSet::remove(JBStream* client, bool delObj)
2031 {
2032 if (!client)
2033 return false;
2034 Lock lock(this);
2035 ObjList* o = m_clients.find(client);
2036 if (!o)
2037 return false;
2038 DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) removing (%p,'%s') delObj=%u [%p]",
2039 m_owner->toString().c_str(),client,client->name(),delObj,this);
2040 o->remove(delObj);
2041 m_changed = true;
2042 return true;
2043 }
2044
2045 // Terminate all streams matching and/or local/remote jid
dropAll(const JabberID & local,const JabberID & remote,XMPPError::Type error,const char * reason)2046 unsigned int JBStreamSet::dropAll(const JabberID& local, const JabberID& remote,
2047 XMPPError::Type error, const char* reason)
2048 {
2049 DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) dropAll(%s,%s,%s,%s) [%p]",
2050 m_owner->toString().c_str(),local.c_str(),remote.c_str(),
2051 XMPPUtils::s_error[error].c_str(),reason,this);
2052 unsigned int n = 0;
2053 lock();
2054 for (ObjList* s = m_clients.skipNull(); s; s = s->skipNext()) {
2055 JBStream* stream = static_cast<JBStream*>(s->get());
2056 Lock lck(stream);
2057 bool terminate = false;
2058 if (!(local || remote))
2059 terminate = true;
2060 else {
2061 if (local)
2062 terminate = stream->local().match(local);
2063 if (remote && !terminate) {
2064 JBServerStream* s2s = stream->incoming() ? stream->serverStream() : 0;
2065 if (s2s)
2066 terminate = s2s->hasRemoteDomain(remote,false);
2067 else
2068 terminate = stream->remote().match(remote);
2069 }
2070 }
2071 if (terminate) {
2072 if (stream->state() != JBStream::Destroy)
2073 n++;
2074 stream->terminate(-1,true,0,error,reason);
2075 }
2076 }
2077 unlock();
2078 return n;
2079 }
2080
2081 // Process the list
run()2082 void JBStreamSet::run()
2083 {
2084 DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) start running [%p]",
2085 m_owner->toString().c_str(),this);
2086 ObjList* o = 0;
2087 while (true) {
2088 if (Thread::check(false)) {
2089 m_exiting = true;
2090 break;
2091 }
2092 lock();
2093 if (m_changed) {
2094 o = 0;
2095 m_changed = false;
2096 }
2097 else if (o)
2098 o = o->skipNext();
2099 if (!o)
2100 o = m_clients.skipNull();
2101 bool eof = o && !o->skipNext();
2102 RefPointer<JBStream> stream = o ? static_cast<JBStream*>(o->get()) : 0;
2103 unlock();
2104 if (stream) {
2105 process(*stream);
2106 stream = 0;
2107 }
2108 else {
2109 // Lock the owner to prevent adding a new client
2110 // Don't exit if a new client was already added
2111 Lock lock(m_owner);
2112 if (!m_changed) {
2113 m_exiting = true;
2114 break;
2115 }
2116 }
2117 if (eof) {
2118 if (m_owner->m_sleepMs)
2119 Thread::msleep(m_owner->m_sleepMs,false);
2120 else
2121 Thread::idle(false);
2122 }
2123 }
2124 DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) stop running [%p]",
2125 m_owner->toString().c_str(),this);
2126 }
2127
2128 // Start running
start()2129 bool JBStreamSet::start()
2130 {
2131 Debug(m_owner->engine(),DebugStub,"JBStreamSet(%s)::start() [%p]",
2132 m_owner->toString().c_str(),this);
2133 return false;
2134 }
2135
2136 // Stop running
stop()2137 void JBStreamSet::stop()
2138 {
2139 Debug(m_owner->engine(),DebugStub,"JBStreamSet(%s)::stop() [%p]",
2140 m_owner->toString().c_str(),this);
2141 }
2142
2143
2144 /*
2145 * JBStreamSetProcessor
2146 */
2147 // Calls stream's getEvent(). Pass a generated event to the engine
process(JBStream & stream)2148 bool JBStreamSetProcessor::process(JBStream& stream)
2149 {
2150 JBEvent* ev = stream.getEvent();
2151 if (!ev)
2152 return false;
2153 bool remove = (ev->type() == JBEvent::Destroy);
2154 m_owner->engine()->processEvent(ev);
2155 if (remove) {
2156 DDebug(m_owner->engine(),DebugAll,
2157 "JBStreamSetProcessor(%s) requesting stream (%p,%s) ref %u removal [%p]",
2158 m_owner->toString().c_str(),&stream,stream.toString().c_str(),
2159 stream.refcount(),this);
2160 m_owner->engine()->removeStream(&stream,true);
2161 }
2162 return true;
2163 }
2164
2165
2166 /*
2167 * JBStreamSetReceive
2168 */
JBStreamSetReceive(JBStreamSetList * owner)2169 JBStreamSetReceive::JBStreamSetReceive(JBStreamSetList* owner)
2170 : JBStreamSet(owner)
2171 {
2172 if (owner && owner->engine())
2173 m_buffer.assign(0,owner->engine()->streamReadBuffer());
2174 }
2175
2176 // Calls stream's readSocket()
process(JBStream & stream)2177 bool JBStreamSetReceive::process(JBStream& stream)
2178 {
2179 return stream.readSocket((char*)m_buffer.data(),m_buffer.length());
2180 }
2181
2182
2183 /*
2184 * JBStreamSetList
2185 */
2186 // Constructor
JBStreamSetList(JBEngine * engine,unsigned int max,unsigned int sleepMs,const char * name)2187 JBStreamSetList::JBStreamSetList(JBEngine* engine, unsigned int max,
2188 unsigned int sleepMs, const char* name)
2189 : Mutex(true,"JBStreamSetList"),
2190 m_engine(engine), m_name(name),
2191 m_max(max), m_sleepMs(sleepMs), m_streamCount(0)
2192 {
2193 XDebug(m_engine,DebugAll,"JBStreamSetList::JBStreamSetList(%s) [%p]",
2194 m_name.c_str(),this);
2195 }
2196
2197 // Destructor
~JBStreamSetList()2198 JBStreamSetList::~JBStreamSetList()
2199 {
2200 XDebug(m_engine,DebugAll,"JBStreamSetList::~JBStreamSetList(%s) [%p]",
2201 m_name.c_str(),this);
2202 }
2203
2204 // Add a stream to the list. Build a new set if there is no room in existing sets
add(JBStream * client)2205 bool JBStreamSetList::add(JBStream* client)
2206 {
2207 if (!client || m_engine->exiting())
2208 return false;
2209 Lock lock(this);
2210 for (ObjList* o = m_sets.skipNull(); o; o = o->skipNext()) {
2211 if ((static_cast<JBStreamSet*>(o->get()))->add(client)) {
2212 m_streamCount++;
2213 return true;
2214 }
2215 }
2216 // Build a new set
2217 JBStreamSet* set = build();
2218 if (!set)
2219 return false;
2220 if (!set->add(client)) {
2221 lock.drop();
2222 TelEngine::destruct(set);
2223 return false;
2224 }
2225 m_streamCount++;
2226 m_sets.append(set);
2227 Debug(m_engine,DebugAll,"JBStreamSetList(%s) added set (%p) count=%u [%p]",
2228 m_name.c_str(),set,m_sets.count(),this);
2229 lock.drop();
2230 if (!set->start())
2231 TelEngine::destruct(set);
2232 return true;
2233 }
2234
2235 // Remove a stream from list
remove(JBStream * client,bool delObj)2236 void JBStreamSetList::remove(JBStream* client, bool delObj)
2237 {
2238 if (!client)
2239 return;
2240 DDebug(m_engine,DebugAll,"JBStreamSetList(%s) removing (%p,'%s') delObj=%u [%p]",
2241 m_name.c_str(),client,client->name(),delObj,this);
2242 Lock lock(this);
2243 for (ObjList* o = m_sets.skipNull(); o; o = o->skipNext()) {
2244 if ((static_cast<JBStreamSet*>(o->get()))->remove(client,delObj)) {
2245 if (m_streamCount)
2246 m_streamCount--;
2247 return;
2248 }
2249 }
2250 }
2251
2252 // Stop one set or all sets
stop(JBStreamSet * set,bool waitTerminate)2253 void JBStreamSetList::stop(JBStreamSet* set, bool waitTerminate)
2254 {
2255 // A set will stop when all its streams will terminate
2256 // Stop it now if wait is not requested
2257 Lock lck(this);
2258 if (set) {
2259 if (set->m_owner != this)
2260 return;
2261 DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopping set (%p) [%p]",
2262 m_name.c_str(),set,this);
2263 set->dropAll();
2264 if (!waitTerminate)
2265 set->stop();
2266 lck.drop();
2267 while (true) {
2268 lock();
2269 bool ok = (0 == m_sets.find(set));
2270 unlock();
2271 if (ok)
2272 break;
2273 Thread::yield(!waitTerminate);
2274 }
2275 DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopped set (%p) [%p]",
2276 m_name.c_str(),set,this);
2277 return;
2278 }
2279 ObjList* o = m_sets.skipNull();
2280 if (!o)
2281 return;
2282 DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopping %u sets [%p]",
2283 m_name.c_str(),m_sets.count(),this);
2284 for (; o; o = o->skipNext()) {
2285 set = static_cast<JBStreamSet*>(o->get());
2286 set->dropAll();
2287 if (!waitTerminate)
2288 set->stop();
2289 }
2290 lck.drop();
2291 while (true) {
2292 lock();
2293 bool ok = (0 == m_sets.skipNull());
2294 unlock();
2295 if (ok)
2296 break;
2297 Thread::yield(!waitTerminate);
2298 }
2299 DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopped all sets [%p]",
2300 m_name.c_str(),this);
2301 }
2302
2303 // Get the string representation of this list
toString() const2304 const String& JBStreamSetList::toString() const
2305 {
2306 return m_name;
2307 }
2308
2309 // Stop all sets. Release memory
destroyed()2310 void JBStreamSetList::destroyed()
2311 {
2312 stop(0,true);
2313 RefObject::destroyed();
2314 }
2315
2316 //Remove a set from list without deleting it
remove(JBStreamSet * set)2317 void JBStreamSetList::remove(JBStreamSet* set)
2318 {
2319 if (!set)
2320 return;
2321 Lock lock(this);
2322 ObjList* o = m_sets.find(set);
2323 if (!o)
2324 return;
2325 o->remove(false);
2326 Debug(m_engine,DebugAll,"JBStreamSetList(%s) removed set (%p) count=%u [%p]",
2327 m_name.c_str(),set,m_sets.count(),this);
2328 }
2329
2330 // Build a specialized stream set. Descendants must override this method
build()2331 JBStreamSet* JBStreamSetList::build()
2332 {
2333 Debug(m_engine,DebugStub,"JBStreamSetList(%s) build() not implemented! [%p]",
2334 m_name.c_str(),this);
2335 return 0;
2336 }
2337
2338
2339 /*
2340 * JBEntityCapsList
2341 */
2342
2343 class EntityCapsRequest : public String
2344 {
2345 public:
EntityCapsRequest(const String & id,JBEntityCaps * caps)2346 inline EntityCapsRequest(const String& id, JBEntityCaps* caps)
2347 : String(id), m_caps(caps), m_expire(Time::msecNow() + 30000)
2348 {}
~EntityCapsRequest()2349 inline ~EntityCapsRequest()
2350 { TelEngine::destruct(m_caps); }
2351 JBEntityCaps* m_caps;
2352 u_int64_t m_expire;
2353 private:
EntityCapsRequest()2354 EntityCapsRequest() {}
2355 };
2356
2357 // Expire pending requests
expire(u_int64_t msecNow)2358 void JBEntityCapsList::expire(u_int64_t msecNow)
2359 {
2360 if (!m_enable)
2361 return;
2362 Lock lock(this);
2363 // Stop at the first not expired item: the other items are added after it
2364 for (ObjList* o = m_requests.skipNull(); o; o = o->skipNull()) {
2365 EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o->get());
2366 if (r->m_caps && msecNow < r->m_expire)
2367 break;
2368 DDebug(DebugInfo,"JBEntityCapsList request id=%s timed out [%p]",
2369 r->toString().c_str(),this);
2370 o->remove();
2371 }
2372 }
2373
2374 // Process a response. This method is thread safe
processRsp(XmlElement * rsp,const String & id,bool ok)2375 bool JBEntityCapsList::processRsp(XmlElement* rsp, const String& id, bool ok)
2376 {
2377 if (!(rsp && id && id.startsWith(m_reqPrefix)))
2378 return false;
2379 if (!m_enable)
2380 return true;
2381 Lock lock(this);
2382 GenObject* o = m_requests.remove(id,false);
2383 if (!o) {
2384 DDebug(DebugInfo,"JBEntityCapsList::processRsp(%p,%s,%u) id not found [%p]",
2385 &rsp,id.c_str(),ok,this);
2386 return true;
2387 }
2388 while (ok) {
2389 XmlElement* query = XMPPUtils::findFirstChild(*rsp,XmlTag::Query);
2390 if (!(query && XMPPUtils::hasXmlns(*query,XMPPNamespace::DiscoInfo)))
2391 break;
2392 EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o);
2393 JBEntityCaps* caps = r->m_caps;
2394 if (!caps)
2395 break;
2396 // Check node (only for XEP 0115 ver >= 1.4)
2397 if (caps->m_version == JBEntityCaps::Ver1_4) {
2398 String* node = query->getAttribute("node");
2399 if (node && *node != (caps->m_node + "#" + caps->m_data)) {
2400 DDebug(DebugAll,"JBEntityCapsList response with invalid node=%s [%p]",
2401 node->c_str(),this);
2402 break;
2403 }
2404 }
2405 caps->m_features.fromDiscoInfo(*query);
2406 // Check hash
2407 if (caps->m_version == JBEntityCaps::Ver1_4) {
2408 caps->m_features.updateEntityCaps();
2409 if (caps->m_data != caps->m_features.m_entityCapsHash) {
2410 DDebug(DebugAll,"JBEntityCapsList response with invalid hash=%s (expected=%s) [%p]",
2411 caps->m_features.m_entityCapsHash.c_str(),caps->m_data.c_str(),this);
2412 break;
2413 }
2414 }
2415 r->m_caps = 0;
2416 // OK
2417 append(caps);
2418 capsAdded(caps);
2419 break;
2420 }
2421 TelEngine::destruct(o);
2422 return true;
2423 }
2424
2425 // Request entity capabilities.
requestCaps(JBStream * stream,const char * from,const char * to,const String & id,char version,const char * node,const char * data)2426 void JBEntityCapsList::requestCaps(JBStream* stream, const char* from, const char* to,
2427 const String& id, char version, const char* node, const char* data)
2428 {
2429 if (!stream)
2430 return;
2431 Lock lock(this);
2432 // Make sure we don't send another disco info for the same id
2433 for (ObjList* o = m_requests.skipNull(); o; o = o->skipNext()) {
2434 EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o->get());
2435 if (r->m_caps && id == r->m_caps)
2436 return;
2437 }
2438 String reqId;
2439 reqId << m_reqPrefix << ++m_reqIndex;
2440 m_requests.append(new EntityCapsRequest(reqId,new JBEntityCaps(id,version,node,data)));
2441 lock.drop();
2442 XmlElement* d = 0;
2443 if (version == JBEntityCaps::Ver1_4)
2444 d = XMPPUtils::createIqDisco(true,true,from,to,reqId,node,data);
2445 else
2446 d = XMPPUtils::createIqDisco(true,true,from,to,reqId);
2447 DDebug(DebugAll,"JBEntityCapsList sending request to=%s node=%s id=%s [%p]",
2448 to,node,reqId.c_str(),this);
2449 stream->sendStanza(d);
2450 }
2451
2452 // Build a document from this list
toDocument(const char * rootName)2453 XmlDocument* JBEntityCapsList::toDocument(const char* rootName)
2454 {
2455 Lock lock(this);
2456 XmlDocument* doc = new XmlDocument;
2457 XmlDeclaration* decl = new XmlDeclaration;
2458 XmlSaxParser::Error err = doc->addChild(decl);
2459 if (err != XmlSaxParser::NoError)
2460 TelEngine::destruct(decl);
2461 XmlComment* info = new XmlComment("Generated jabber entity capabilities cache");
2462 err = doc->addChild(info);
2463 if (err != XmlSaxParser::NoError)
2464 TelEngine::destruct(info);
2465 XmlElement* root = new XmlElement(rootName);
2466 err = doc->addChild(root);
2467 if (err != XmlSaxParser::NoError) {
2468 TelEngine::destruct(root);
2469 return doc;
2470 }
2471 for (ObjList* o = skipNull(); o; o = o->skipNext()) {
2472 JBEntityCaps* caps = static_cast<JBEntityCaps*>(o->get());
2473 XmlElement* item = new XmlElement(s_entityCapsItem);
2474 item->setAttribute("id",caps->c_str());
2475 item->setAttribute("version",String((int)caps->m_version));
2476 item->setAttribute("node",caps->m_node);
2477 item->setAttribute("data",caps->m_data);
2478 caps->m_features.add(*item);
2479 doc->addChild(item);
2480 }
2481 return doc;
2482 }
2483
2484 // Build this list from an XML document
fromDocument(XmlDocument & doc,const char * rootName)2485 void JBEntityCapsList::fromDocument(XmlDocument& doc, const char* rootName)
2486 {
2487 Lock lock(this);
2488 clear();
2489 m_requests.clear();
2490 XmlElement* root = doc.root();
2491 if (!root || (!TelEngine::null(rootName) && root->toString() != rootName)) {
2492 DDebug(DebugAll,"JBEntityCapsList invalid document root %p '%s' (expected=%s) [%p]",
2493 root,root ? root->tag() : "",rootName,this);
2494 return;
2495 }
2496 XmlElement* item = root->findFirstChild(&s_entityCapsItem);
2497 for (; item; item = root->findNextChild(item,&s_entityCapsItem)) {
2498 String* id = item->getAttribute("id");
2499 if (TelEngine::null(id))
2500 continue;
2501 String* tmp = item->getAttribute("version");
2502 JBEntityCaps* cap = new JBEntityCaps(*id,tmp ? tmp->toInteger(-1) : -1,
2503 item->attribute("node"),item->attribute("data"));
2504 cap->m_features.fromDiscoInfo(*item);
2505 append(cap);
2506 }
2507 capsAdded(0);
2508 }
2509
2510 // Process an element containing an entity capabily child.
2511 // Request capabilities if not found in the list
processCaps(String & capsId,XmlElement * xml,JBStream * stream,const char * from,const char * to)2512 bool JBEntityCapsList::processCaps(String& capsId, XmlElement* xml, JBStream* stream,
2513 const char* from, const char* to)
2514 {
2515 if (!(m_enable && xml))
2516 return false;
2517 char version = 0;
2518 String* node = 0;
2519 String* ver = 0;
2520 String* ext = 0;
2521 if (!decodeCaps(*xml,version,node,ver,ext))
2522 return false;
2523 JBEntityCaps::buildId(capsId,version,*node,*ver,ext);
2524 Lock lock(this);
2525 JBEntityCaps* caps = findCaps(capsId);
2526 if (caps)
2527 return true;
2528 // Hack for google (doesn't support disco info, supports only disco info with node)
2529 if (version == JBEntityCaps::Ver1_3 &&
2530 (*node == s_googleTalkNode || *node == s_googleMailNode ||
2531 *node == s_googleAndroidNode || *node == s_googleAndroidNode2)) {
2532 caps = new JBEntityCaps(capsId,version,*node,*ver);
2533 if (ext) {
2534 ObjList* list = ext->split(' ',false);
2535 if (list->find("voice-v1")) {
2536 caps->m_features.add(XMPPNamespace::JingleSession);
2537 caps->m_features.add(XMPPNamespace::JingleAudio);
2538 }
2539 TelEngine::destruct(list);
2540 }
2541 append(caps);
2542 capsAdded(caps);
2543 return true;
2544 }
2545 if (stream)
2546 requestCaps(stream,from,to,capsId,version,*node,*ver);
2547 return stream != 0;
2548 }
2549
2550 // Add capabilities to a list.
addCaps(NamedList & list,JBEntityCaps & caps)2551 void JBEntityCapsList::addCaps(NamedList& list, JBEntityCaps& caps)
2552 {
2553 #define CHECK_NS(ns,param) \
2554 if (caps.hasFeature(ns)) { \
2555 params->append(param,","); \
2556 list.addParam(param,String::boolText(true)); \
2557 }
2558 int jingleVersion = -1;
2559 if (caps.m_features.get(XMPPNamespace::Jingle))
2560 jingleVersion = 1;
2561 else if (caps.m_features.get(XMPPNamespace::JingleSession) ||
2562 caps.m_features.get(XMPPNamespace::JingleVoiceV1))
2563 jingleVersion = 0;
2564 NamedString* params = new NamedString("caps.params");
2565 list.addParam("caps.id",caps.toString());
2566 list.addParam(params);
2567 if (jingleVersion != -1) {
2568 params->append("caps.jingle_version");
2569 list.addParam("caps.jingle_version",String(jingleVersion));
2570 if (caps.hasAudio()) {
2571 params->append("caps.audio",",");
2572 list.addParam("caps.audio",String::boolText(true));
2573 }
2574 switch (jingleVersion) {
2575 case 1:
2576 CHECK_NS(XMPPNamespace::JingleTransfer,"caps.calltransfer");
2577 CHECK_NS(XMPPNamespace::JingleAppsFileTransfer,"caps.filetransfer");
2578 break;
2579 case 0:
2580 break;
2581 }
2582 CHECK_NS(XMPPNamespace::FileInfoShare,"caps.fileinfoshare");
2583 CHECK_NS(XMPPNamespace::ResultSetMngt,"caps.resultsetmngt");
2584 }
2585 CHECK_NS(XMPPNamespace::Muc,"caps.muc");
2586 #undef CHECK_NS
2587 }
2588
2589 // Load (reset) this list from an XML document file.
loadXmlDoc(const char * file,DebugEnabler * enabler)2590 bool JBEntityCapsList::loadXmlDoc(const char* file, DebugEnabler* enabler)
2591 {
2592 if (!m_enable)
2593 return false;
2594 XmlDocument d;
2595 int io = 0;
2596 DDebug(enabler,DebugAll,"Loading entity caps from '%s'",file);
2597 XmlSaxParser::Error err = d.loadFile(file,&io);
2598 if (err == XmlSaxParser::NoError) {
2599 fromDocument(d);
2600 return true;
2601 }
2602 String error;
2603 if (err == XmlSaxParser::IOError) {
2604 String tmp;
2605 Thread::errorString(tmp,io);
2606 error << " " << io << " '" << tmp << "'";
2607 }
2608 Debug(enabler,DebugNote,"Failed to load entity caps from '%s': %s%s",
2609 file,XmlSaxParser::getError(err),error.safe());
2610 return false;
2611 }
2612
2613 // Save this list to an XML document file.
saveXmlDoc(const char * file,DebugEnabler * enabler)2614 bool JBEntityCapsList::saveXmlDoc(const char* file, DebugEnabler* enabler)
2615 {
2616 DDebug(enabler,DebugAll,"Saving entity caps to '%s'",file);
2617 if (TelEngine::null(file))
2618 return false;
2619 XmlDocument* doc = toDocument();
2620 int res = doc->saveFile(file,true," ");
2621 if (res)
2622 Debug(enabler,DebugNote,"Failed to save entity caps to '%s'",file);
2623 delete doc;
2624 return res == 0;
2625 }
2626
2627 // Check if an XML element has a 'c' entity capability child and process it
decodeCaps(const XmlElement & xml,char & version,String * & node,String * & ver,String * & ext)2628 bool JBEntityCapsList::decodeCaps(const XmlElement& xml, char& version, String*& node,
2629 String*& ver, String*& ext)
2630 {
2631 // Find the first entity caps child with valid node and ext
2632 XmlElement* c = 0;
2633 while (true) {
2634 c = XMPPUtils::findNextChild(xml,c,XmlTag::EntityCapsTag,
2635 XMPPNamespace::EntityCaps);
2636 if (!c)
2637 break;
2638 if (TelEngine::null(c->getAttribute("node")) ||
2639 TelEngine::null(c->getAttribute("ver")))
2640 continue;
2641 break;
2642 }
2643 if (!c)
2644 return false;
2645 // Check for a subsequent child with new entity caps if the first one is an old version
2646 if (!c->getAttribute("hash")) {
2647 XmlElement* s = c;
2648 while (true) {
2649 s = XMPPUtils::findNextChild(xml,s,XmlTag::EntityCapsTag,
2650 XMPPNamespace::EntityCaps);
2651 if (!s)
2652 break;
2653 if (!s->getAttribute("hash") ||
2654 TelEngine::null(s->getAttribute("node")) ||
2655 TelEngine::null(s->getAttribute("ver")))
2656 continue;
2657 c = s;
2658 break;
2659 }
2660 }
2661 node = c->getAttribute("node");
2662 ver = c->getAttribute("ver");
2663 String* hash = c->getAttribute("hash");
2664 if (hash) {
2665 // Version 1.4 or greater
2666 if (*hash != "sha-1")
2667 return false;
2668 version = JBEntityCaps::Ver1_4;
2669 ext = 0;
2670 }
2671 else {
2672 version = JBEntityCaps::Ver1_3;
2673 ext = c->getAttribute("ext");
2674 }
2675 return true;
2676 }
2677
2678 /* vi: set ts=8 sw=4 sts=4 noet: */
2679