1 /**
2  * jbengine.cpp
3  * Yet Another Jabber Component Protocol Stack
4  * This file is part of the YATE Project http://YATE.null.ro
5  *
6  * Yet Another Telephony Engine - a fully featured software PBX and IVR
7  * Copyright (C) 2004-2014 Null Team
8  *
9  * This software is distributed under multiple licenses;
10  * see the COPYING file in the main directory for licensing
11  * information for this specific distribution.
12  *
13  * This use of this software may be subject to additional restrictions.
14  * See the LEGAL file in the main directory for details.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19  */
20 
21 #include <yatejabber.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 
25 using namespace TelEngine;
26 
27 
fixValue(const NamedList & p,const char * param,unsigned int defVal,unsigned int min,unsigned int max,bool zero=false)28 static unsigned int fixValue(const NamedList& p, const char* param,
29     unsigned int defVal, unsigned int min, unsigned int max, bool zero = false)
30 {
31     unsigned int val = p.getIntValue(param,defVal);
32     if (!val) {
33 	if (!zero)
34 	    val = defVal;
35     }
36     else if (val < min)
37 	val = min;
38     else if (val > max)
39 	val = max;
40     return val;
41 }
42 
43 const TokenDict JBEvent::s_type[] = {
44     {"Message",         Message},
45     {"Presence",        Presence},
46     {"Iq",              Iq},
47     {"Terminated",      Terminated},
48     {"Destroy",         Destroy},
49     {"Start",           Start},
50     {"Auth",            Auth},
51     {"Bind",            Bind},
52     {"Running",         Running},
53     {"DbResult",        DbResult},
54     {"DbVerify",        DbVerify},
55     {"RegisterOk",      RegisterOk},
56     {"RegisterFailed",  RegisterFailed},
57     {"Unknown",         Unknown},
58     {0,0}
59 };
60 
61 const TokenDict JBConnect::s_statusName[] =  {
62     {"Start",     Start},
63     {"Address",   Address},
64     {"Srv",       Srv},
65     {"Domain",    Domain},
66     {0,0}
67 };
68 
69 // Entity caps item tag in document
70 static const String s_entityCapsItem = "item";
71 // Node values used by entity caps
72 static const String s_googleTalkNode = "http://www.google.com/xmpp/client/caps";
73 static const String s_googleMailNode = "http://mail.google.com/xmpp/client/caps";
74 static const String s_googleAndroidNode = "http://www.android.com/gtalk/client/caps";
75 static const String s_googleAndroidNode2 = "http://www.android.com/gtalk/client/caps2";
76 
77 // Stream read buffer
78 #define JB_STREAMBUF                8192
79 #define JB_STREAMBUF_MIN            1024
80 // Stream restart counter
81 #define JB_RESTART_COUNT               2
82 #define JB_RESTART_COUNT_MIN           1
83 #define JB_RESTART_COUNT_MAX          10
84 #define JB_RESTART_UPDATE          15000
85 #define JB_RESTART_UPDATE_MIN       5000
86 #define JB_RESTART_UPDATE_MAX     300000
87 // Stream setup timer
88 #define JB_SETUP_INTERVAL         180000
89 #define JB_SETUP_INTERVAL_MIN      60000
90 #define JB_SETUP_INTERVAL_MAX     600000
91 // Wait stream start timer
92 #define JB_START_INTERVAL          20000
93 #define JB_START_INTERVAL_MIN      10000
94 #define JB_START_INTERVAL_MAX      60000
95 // Stream connect timer
96 #define JB_CONNECT_INTERVAL        60000
97 #define JB_CONNECT_INTERVAL_MIN     1000
98 #define JB_CONNECT_INTERVAL_MAX   120000
99 // Stream SRV query timer
100 #define JB_SRV_INTERVAL            30000
101 #define JB_SRV_INTERVAL_MIN        10000
102 #define JB_SRV_INTERVAL_MAX       120000
103 // Ping
104 #define JB_PING_INTERVAL          600000
105 #define JB_PING_INTERVAL_MIN       60000
106 #define JB_PING_INTERVAL_MAX     3600000
107 #define JB_PING_TIMEOUT            30000
108 #define JB_PING_TIMEOUT_MIN        10000
109 #define JB_PING_TIMEOUT_MAX        JB_PING_INTERVAL_MIN
110 // Idle
111 #define JB_IDLE_INTERVAL         3600000 // 1h
112 #define JB_IDLE_INTERVAL_MIN      600000 // 10min
113 #define JB_IDLE_INTERVAL_MAX    21600000 // 6h
114 // Redirect
115 #define JB_REDIRECT_COUNT              0
116 #define JB_REDIRECT_COUNT_CLIENT       2
117 #define JB_REDIRECT_MIN	               0
118 #define JB_REDIRECT_MAX	              10
119 
120 
121 /*
122  * SASL
123  */
findZero(const char * buf,unsigned int max)124 static inline unsigned int findZero(const char* buf, unsigned int max)
125 {
126     if (!buf)
127 	return max + 1;
128     unsigned int pos = 0;
129     while (pos < max && buf[pos])
130 	pos++;
131     return pos;
132 }
133 
134 // Parse and decode a buffer containing SASL plain authentication data
135 // See RFC 4616 Section 2
136 // Format: [authzid] UTF8NUL username UTF8NUL passwd
137 // Each token must be up to 255 bytes length
splitPlainSasl(const DataBlock & buf)138 static NamedList* splitPlainSasl(const DataBlock& buf)
139 {
140     const char* d = (const char*)buf.data();
141     unsigned int len = buf.length();
142     if (!len)
143 	return 0;
144     String user, pwd, authzid;
145     // Use a while to break to the end
146     bool ok = false;
147     while (true) {
148 	// authzid
149 	unsigned int ll = findZero(d,len);
150 	if (ll && (ll > 255 || ll > len))
151 	    break;
152 	authzid.assign(d,ll);
153 	if (-1 == authzid.lenUtf8())
154 	    break;
155 	d += ll;
156 	len -= ll;
157 	// Username
158 	if (d[0] || len < 2)
159 	    break;
160 	ll = findZero(++d,--len);
161 	if (!(ll && ll < len && ll < 256))
162 	    break;
163 	user.assign(d,ll);
164 	if (-1 == user.lenUtf8())
165 	    break;
166 	d += ll;
167 	len -= ll;
168 	// Password
169 	if (d[0] || len < 2)
170 	    break;
171 	ll = findZero(++d,--len);
172 	if (ll != len || ll > 255)
173 	    break;
174 	pwd.assign(d,ll);
175 	ok = (-1 != pwd.lenUtf8());
176 	break;
177     }
178     if (!ok)
179 	return 0;
180     NamedList* result = new NamedList("");
181     result->addParam("username",user);
182     result->addParam("response",pwd);
183     if (authzid)
184 	result->addParam("authzid",authzid);
185     return result;
186 }
187 
splitDigestSasl(const String & buf)188 static NamedList* splitDigestSasl(const String& buf)
189 {
190     const char* d = buf.c_str();
191     unsigned int len = buf.length();
192     NamedList* result = 0;
193     while (len) {
194 	// Find '='
195 	unsigned int i = 0;
196 	while (i < len && d[i] != '=')
197 	    i++;
198 	if (!i || i >= len) {
199 	    Debug(DebugNote,"splitDigestSasl() unexpected end of buffer '%s'",d);
200 	    break;
201 	}
202 	// Get param name and skip over '='
203 	String name(d,i);
204 	i++;
205 	d += i;
206 	len -= i;
207 	XDebug(DebugAll,"splitDigestSasl() found directive='%s' rest='%s' len=%u",
208 	    name.c_str(),d,len);
209 	String value;
210 	if (len) {
211 	    // Find ',', handle quoted parameters
212 	    if (*d == '\"') {
213 		if (len < 2) {
214 		    Debug(DebugNote,
215 			"splitDigestSasl() unexpected end of buffer '%s'",d);
216 		    break;
217 		}
218 		// Find an unescaped "
219 		for (i = 1; i < len; i++) {
220 		    if (d[i] == '"' && d[i-1] != '\\')
221 			break;
222 		}
223 		if (i == len) {
224 		    Debug(DebugNote,"splitDigestSasl() unclosed '\"' found at %u",
225 			buf.length() - len);
226 		    break;
227 		}
228 		// Unescape the content
229 		value.assign(d + 1,i - 1);
230 		int pos = -1;
231 		unsigned int start = 0;
232 		bool ok = true;
233 		while (-1 != (value.find('\\',start))) {
234 		    if (pos == 0) {
235 			// No character to escape: error
236 			if (value.length() == 1) {
237 			    Debug(DebugNote,"splitDigestSasl() 2");
238 			    ok = false;
239 			    break;
240 			}
241 			value = value.substr(1);
242 		    }
243 		    else if ((unsigned int)pos < value.length() - 1) {
244 			if (value[pos - 1] != '"') {
245 			    // Escaped char
246 			    value = value.substr(0,pos) + value.substr(0,pos + 1);
247 			    start = pos + 1;
248 			}
249 			else if (value[pos + 1] == '"') {
250 			    // Escaped backslash
251 			    value = value.substr(0,pos - 1) + "\\" + value.substr(0,pos + 2);
252 			    start = pos + 1;
253 			}
254 			else {
255 			    // Error
256 			    Debug(DebugNote,"splitDigestSasl() 3");
257 			    ok = false;
258 			    break;
259 			}
260 		    }
261 		    else {
262 			// No character to escape: error
263 			Debug(DebugNote,"splitDigestSasl() 4");
264 			ok = false;
265 			break;
266 		    }
267 		}
268 		if (!ok)
269 		    break;
270 		// Adjust buffer and length
271 		if (i < len) {
272 		    if (i == len - 1)
273 			i++;
274 		    else if (d[i + 1] == ',')
275 			i += 2;
276 		    else {
277 			Debug(DebugNote,"splitDigestSasl() ',' not found at %u rest=%s",
278 			    buf.length() - len + i + 1,d);
279 			break;
280 		    }
281 		}
282 	    }
283 	    else {
284 		// Skip until ,
285 		for (i = 0; i < len && d[i] != ','; i++)
286 		    ;
287 		if (i)
288 		    value.assign(d,i);
289 		if (i < len)
290 		    i++;
291 	    }
292 	    d += i;
293 	    len -= i;
294 	}
295 	if (!result)
296 	    result = new NamedList("");
297 	XDebug(DebugAll,"splitDigestSasl() found '%s'='%s' rest='%s' len=%u",
298 	    name.c_str(),value.c_str(),d,len);
299 	result->addParam(name,value);
300     }
301     if (len)
302 	TelEngine::destruct(result);
303     return result;
304 }
305 
306 // Apend a quoted directive to a string
307 // Escape the value
appendQDirective(String & buf,const String & name,const String & value)308 static inline void appendQDirective(String& buf, const String& name,
309     const String& value)
310 {
311     if (-1 == value.find('\"') && -1 == value.find('\\')) {
312 	buf.append(name + "=\"" + value + "\"",",");
313 	return;
314     }
315     // Replace \ with "\" and " with \"
316     // See RFC2831 7.2
317     String tmp;
318     char c = 0;
319     char* s = (char*)value.c_str();
320     while ((c = *s++)) {
321 	if (c == '\"')
322 	    tmp << '\\' << c;
323 	else if (c == '\\')
324 	    tmp << "\"\\\"";
325 	else
326 	    tmp += c;
327     }
328     buf.append(name + "=\"" + tmp + "\"",",");
329 }
330 
331 // Constructor
SASL(bool plain,const char * realm)332 SASL::SASL(bool plain, const char* realm)
333     : m_plain(plain), m_params(0), m_realm(realm), m_nonceCount(0)
334 {
335 }
336 
337 // Set auth params
setAuthParams(const char * user,const char * pwd)338 void SASL::setAuthParams(const char* user, const char* pwd)
339 {
340     if (TelEngine::null(user) &&  TelEngine::null(pwd))
341 	return;
342     if (!m_params)
343 	m_params = new NamedList("");
344     if (!TelEngine::null(user))
345 	m_params->setParam("username",user);
346     if (!TelEngine::null(pwd))
347 	m_params->setParam("password",pwd);
348 }
349 
350 // Build an auth response
buildAuthRsp(String & buf,const char * digestUri)351 bool SASL::buildAuthRsp(String& buf, const char* digestUri)
352 {
353     if (!m_params)
354 	return false;
355 
356     // Plain. See RFC 4616 Section 2
357     // Format: [authzid] UTF8NUL username UTF8NUL passwd
358     // Each token must be up to 255 bytes length
359     if (m_plain) {
360 	if (!m_params)
361 	    return false;
362 	String* user = m_params->getParam("username");
363 	String* pwd = m_params->getParam("password");
364 	if (!user || user->length() > 255 || !pwd || pwd->length() > 255)
365 	    return false;
366 	DataBlock data;
367 	unsigned char nul = 0;
368 	data.append(&nul,1);
369 	data += *user;
370 	data.append(&nul,1);
371 	data += *pwd;
372 	Base64 base64((void*)data.data(),data.length());
373 	base64.encode(buf);
374 	return true;
375     }
376 
377     // Digest MD5. See RFC 2831 2.1.2.1
378     String* pwd = m_params->getParam("password");
379     if (!pwd)
380 	return false;
381 
382 #define SASL_ADD_QDIR(n) { \
383     NamedString* tmp = m_params->getParam(n); \
384     if (tmp) \
385 	appendQDirective(buf,tmp->name(),*tmp); \
386 }
387     SASL_ADD_QDIR("username")
388     SASL_ADD_QDIR("realm")
389     SASL_ADD_QDIR("nonce")
390     MD5 md5(String((unsigned int)Random::random()));
391     m_cnonce = md5.hexDigest();
392     m_params->setParam("cnonce",m_cnonce);
393     SASL_ADD_QDIR("cnonce")
394     m_nonceCount++;
395     char tmp[9];
396     ::sprintf(tmp,"%08x",m_nonceCount);
397     m_params->setParam("nc",tmp);
398     SASL_ADD_QDIR("nc")
399     m_params->setParam("qop","auth");
400     SASL_ADD_QDIR("qop")
401     m_params->setParam("digest-uri",digestUri);
402     SASL_ADD_QDIR("digest-uri")
403     String rsp;
404     buildMD5Digest(rsp,*pwd);
405     buf << ",response=" << rsp;
406     SASL_ADD_QDIR("charset")
407     SASL_ADD_QDIR("md5-sess")
408     XDebug(DebugAll,"SASL built MD5 response %s [%p]",buf.c_str(),this);
409 #undef SASL_ADD_QDIR
410     Base64 base64((void*)buf.c_str(),buf.length());
411     buf.clear();
412     base64.encode(buf);
413     return true;
414 }
415 
416 // Build an MD5 challenge from this object
417 // See RFC 2831 Section 2.1.1
buildMD5Challenge(String & buf)418 bool SASL::buildMD5Challenge(String& buf)
419 {
420     String tmp;
421     if (m_realm) {
422 	if (-1 == m_realm.lenUtf8())
423 	    return false;
424 	appendQDirective(tmp,"realm",m_realm);
425     }
426     // Re-build nonce. Increase nonce count
427     m_nonce.clear();
428     m_nonce << (int)Time::msecNow() << (int)Random::random();
429     MD5 md5(m_nonce);
430     m_nonce = md5.hexDigest();
431     m_nonceCount++;
432     tmp.append("nonce=\"" + m_nonce + "\"",",");
433     tmp << ",qop=\"auth\"";
434     tmp << ",charset=\"utf-8\"";
435     tmp << ",algorithm=\"md5-sess\"";
436     // RFC 2831 2.1.1: The size of a digest-challenge MUST be less than 2048 bytes
437     if (tmp.length() < 2048) {
438 	buf = tmp;
439 	return true;
440     }
441     m_nonceCount--;
442     return false;
443 }
444 
parsePlain(const DataBlock & buf)445 bool SASL::parsePlain(const DataBlock& buf)
446 {
447 #ifdef XDEBUG
448     String tmp;
449     tmp.hexify((void*)buf.data(),buf.length(),' ');
450     Debug(DebugAll,"SASL::parsePlain() %s [%p]",tmp.c_str(),this);
451 #endif
452     TelEngine::destruct(m_params);
453     m_params = splitPlainSasl(buf);
454     return m_params != 0;
455 }
456 
457 // Parse and decode a buffer containing a SASL Digest MD5 challenge
parseMD5Challenge(const String & buf)458 bool SASL::parseMD5Challenge(const String& buf)
459 {
460     XDebug(DebugAll,"SASL::parseMD5Challenge() %s [%p]",buf.c_str(),this);
461     TelEngine::destruct(m_params);
462     // RFC 2831 2.1.1: The size of a digest-response MUST be less than 2048 bytes
463     if (buf.length() >= 2048) {
464 	Debug(DebugNote,"SASL::parseMD5Challenge() invalid length=%u (max=2048) [%p]",
465 	    buf.length(),this);
466 	return false;
467     }
468     m_params = splitDigestSasl(buf);
469     if (!m_params) {
470 	Debug(DebugNote,"SASL::parseMD5Challenge() failed to split params [%p]",
471 	    this);
472 	return false;
473     }
474     return true;
475 }
476 
477 // Parse and decode a buffer containing a SASL Digest MD5 response
478 // See RFC 2831
parseMD5ChallengeRsp(const String & buf)479 bool SASL::parseMD5ChallengeRsp(const String& buf)
480 {
481     XDebug(DebugAll,"SASL::parseMD5ChallengeRsp() %s [%p]",buf.c_str(),this);
482     TelEngine::destruct(m_params);
483     // RFC 2831 2.1.2: The size of a digest-response MUST be less than 4096 bytes
484     if (buf.length() >= 4096) {
485 	Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid length=%u (max=4096) [%p]",
486 	    buf.length(),this);
487 	return false;
488     }
489     m_params = splitDigestSasl(buf);
490     if (!m_params) {
491 	Debug(DebugNote,"SASL::parseMD5ChallengeRsp() failed to split params [%p]",
492 	    this);
493 	return false;
494     }
495     bool ok = false;
496     // Check realm, nonce, nonce count
497     // Use a while to break to the end
498     while (true) {
499 	String* tmp = m_params->getParam("realm");
500 	if (!tmp || *tmp != m_realm) {
501 	    Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid realm='%s' [%p]",
502 		TelEngine::c_safe(tmp),this);
503 	    break;
504 	}
505 	tmp = m_params->getParam("nonce");
506 	if (!tmp || *tmp != m_nonce) {
507 	    Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid nonce='%s' [%p]",
508 		TelEngine::c_safe(tmp),this);
509 	    break;
510 	}
511 	tmp = m_params->getParam("nc");
512 	if (!tmp || (unsigned int)tmp->toInteger(0,16) != m_nonceCount) {
513 	    Debug(DebugNote,"SASL::parseMD5ChallengeRsp() invalid nonce count='%s' [%p]",
514 		TelEngine::c_safe(tmp),this);
515 	    break;
516 	}
517 	ok = true;
518 	break;
519     }
520     if (ok)
521 	return true;
522     TelEngine::destruct(m_params);
523     return false;
524 }
525 
526 // Build a Digest MD5 SASL to be sent with authentication responses
527 // See RFC 2831 2.1.2.1
buildMD5Digest(String & dest,const NamedList & params,const char * password,bool challengeRsp)528 void SASL::buildMD5Digest(String& dest, const NamedList& params,
529     const char* password, bool challengeRsp)
530 {
531     const char* nonce = params.getValue("nonce");
532     const char* cnonce = params.getValue("cnonce");
533     String qop = params.getValue("qop","auth");
534     MD5 md5;
535     md5 << params.getValue("username") << ":" << params.getValue("realm");
536     md5 << ":" << password;
537     MD5 md5A1(md5.rawDigest(),16);
538     md5A1 << ":" << nonce << ":" << cnonce;
539     const char* authzid = params.getValue("authzid");
540     if (authzid)
541 	md5A1 << ":" << authzid;
542     MD5 md5A2;
543     if (challengeRsp)
544 	md5A2 << "AUTHENTICATE";
545     md5A2 << ":" << params.getValue("digest-uri");
546     if (qop != "auth")
547 	md5A2 << ":" << String('0',32);
548     MD5 md5Rsp;
549     md5Rsp << md5A1.hexDigest();
550     md5Rsp << ":" << nonce << ":" << params.getValue("nc");
551     md5Rsp << ":" << cnonce << ":" << qop << ":" << md5A2.hexDigest();
552     dest = md5Rsp.hexDigest();
553 }
554 
555 
556 /*
557  * JBConnect
558  */
559 // Constructor. Add itself to the stream's engine
JBConnect(const JBStream & stream)560 JBConnect::JBConnect(const JBStream& stream)
561     : m_status(Start), m_domain(stream.serverHost()), m_port(0),
562     m_engine(stream.engine()), m_stream(stream.toString()),
563     m_streamType((JBStream::Type)stream.type())
564 {
565     bool redir = false;
566     stream.connectAddr(m_address,m_port,m_localIp,m_status,m_srvs,&redir);
567     if (redir && m_address) {
568 	char c = m_address[0];
569 	if ((c < '0' || c > '9') && c != '[' && m_address[m_address.length() - 1] != ']') {
570 	    // Redirect to domain: replace stream domain, clear address
571 	    m_domain = m_address;
572 	    m_address.clear();
573 	}
574 	else {
575 	    // Redirect to IP address: clear stream domain
576 	    m_domain.clear();
577 	}
578     }
579     if (m_engine)
580 	m_engine->connectStatus(this,true);
581 }
582 
583 // Remove itself from engine
~JBConnect()584 JBConnect::~JBConnect()
585 {
586     terminated(0,true);
587 }
588 
589 // Stop the thread
stopConnect()590 void JBConnect::stopConnect()
591 {
592     Debug(m_engine,DebugStub,"JBConnect::stopConnect() not implemented!");
593 }
594 
595 // Retrieve the stream name
toString() const596 const String& JBConnect::toString() const
597 {
598     return m_stream;
599 }
600 
601 // Connect the socket.
connect()602 void JBConnect::connect()
603 {
604     if (!m_engine)
605 	return;
606     Debug(m_engine,DebugAll,"JBConnect(%s) starting stat=%s [%p]",
607 	m_stream.c_str(),lookup(m_status,s_statusName),this);
608     int port = m_port;
609     if (!port) {
610 	if (m_streamType == JBStream::c2s)
611 	    port = XMPP_C2S_PORT;
612 	else if (m_streamType == JBStream::s2s)
613 	    port = XMPP_S2S_PORT;
614 	else {
615 	    Debug(m_engine,DebugNote,"JBConnect(%s) no port for %s stream [%p]",
616 		m_stream.c_str(),lookup(m_streamType,JBStream::s_typeName),this);
617 	    return;
618 	}
619     }
620     Socket* sock = 0;
621     bool stop = false;
622     advanceStatus();
623     // Try to use ip/port
624     if (m_status == Address) {
625 	if (m_address && port) {
626 	    sock = connect(m_address,port,stop);
627 	    if (sock || stop || exiting(sock)) {
628 		terminated(sock,false);
629 		return;
630 	    }
631 	}
632 	advanceStatus();
633     }
634     if (m_status == Srv && m_domain) {
635 	if (!m_srvs.skipNull()) {
636 	    // Get SRV records from remote party
637 	    String query;
638 	    if (m_streamType == JBStream::c2s)
639 		query = "_xmpp-client._tcp.";
640 	    else
641 		query = "_xmpp-server._tcp.";
642 	    query << m_domain;
643 	    String error;
644 	    // Start connecting timeout
645 	    if (!notifyConnecting(true,true))
646 		return;
647 	    int code = 0;
648 	    if (Resolver::init())
649 		code = Resolver::srvQuery(query,m_srvs,&error);
650 	    // Stop the timeout if not exiting
651 	    if (exiting(sock) || !notifyConnecting(false,true)) {
652 		terminated(0,false);
653 		return;
654 	    }
655 	    if (!code)
656 		DDebug(m_engine,DebugAll,"JBConnect(%s) SRV query for '%s' got %u records [%p]",
657 		    m_stream.c_str(),query.c_str(),m_srvs.count(),this);
658 	    else
659 		Debug(m_engine,DebugNote,"JBConnect(%s) SRV query for '%s' failed: %d '%s' [%p]",
660 		    m_stream.c_str(),query.c_str(),code,error.c_str(),this);
661 	}
662 	else
663 	    // Remove the first entry: we already used it
664 	    m_srvs.remove();
665 	ObjList* o = 0;
666 	while (0 != (o = m_srvs.skipNull())) {
667 	    SrvRecord* rec = static_cast<SrvRecord*>(o->get());
668 	    sock = connect(rec->address(),rec->port(),stop);
669 	    o->remove();
670 	    if (sock || stop || exiting(sock)) {
671 		terminated(sock,false);
672 		return;
673 	    }
674 	}
675 	advanceStatus();
676     }
677     else if (m_status == Srv)
678 	advanceStatus();
679     if (m_status == Domain) {
680 	// Try to resolve the domain
681 	if (port && m_domain)
682 	    sock = connect(m_domain,port,stop);
683 	advanceStatus();
684     }
685     terminated(sock,false);
686 }
687 
688 // Create and try to connect a socket. Return it on success
689 // Set stop on fatal failure and return 0
connect(const char * addr,int port,bool & stop)690 Socket* JBConnect::connect(const char* addr, int port, bool& stop)
691 {
692     Socket* sock = new Socket(PF_INET,SOCK_STREAM);
693     // Bind to local ip
694     if (m_localIp) {
695 	SocketAddr lip(PF_INET);
696 	lip.host(m_localIp);
697 	bool ok = false;
698 	if (lip.host()) {
699 	    ok = sock->bind(lip);
700 	    if (!ok) {
701 		String tmp;
702 		Thread::errorString(tmp,sock->error());
703 		Debug(m_engine,DebugNote,
704 		    "JBConnect(%s) failed to bind to '%s' (%s). %d '%s' [%p]",
705 		    m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),
706 		    sock->error(),tmp.c_str(),this);
707 	    }
708 	}
709 	else
710 	    Debug(m_engine,DebugNote,"JBConnect(%s) invalid local ip '%s' [%p]",
711 		m_stream.c_str(),m_localIp.c_str(),this);
712 	stop = !ok || exiting(sock);
713 	if (stop) {
714 	    deleteSocket(sock);
715 	    return 0;
716 	}
717 	DDebug(m_engine,DebugAll,"JBConnect(%s) bound to '%s' (%s) [%p]",
718 	    m_stream.c_str(),lip.host().c_str(),m_localIp.c_str(),this);
719     }
720     // Use async connect
721     u_int64_t tout = 0;
722     if (m_engine)
723 	tout = m_engine->m_connectTimeout * 1000;
724     if (tout && !(sock->canSelect() && sock->setBlocking(false))) {
725 	tout = 0;
726 	if (sock->canSelect()) {
727 	    String tmp;
728 	    Thread::errorString(tmp,sock->error());
729 	    Debug(m_engine,DebugInfo,
730 		"JBConnect(%s) using sync connect (async set failed). %d '%s' [%p]",
731 		m_stream.c_str(),sock->error(),tmp.c_str(),this);
732 	}
733 	else
734 	    Debug(m_engine,DebugInfo,
735 		"JBConnect(%s) using sync connect (select() not available) [%p]",
736 		m_stream.c_str(),this);
737     }
738     if (!notifyConnecting(tout == 0)) {
739 	stop = true;
740 	deleteSocket(sock);
741 	return 0;
742     }
743     u_int64_t start = tout ? Time::now() : 0;
744     SocketAddr a(PF_INET);
745     a.host(addr);
746     a.port(port);
747     // Check exiting: it may take some time to resolve the domain
748     stop = exiting(sock);
749     if (stop)
750 	return 0;
751     if (!a.host()) {
752 	Debug(m_engine,DebugNote,"JBConnect(%s) failed to resolve '%s' [%p]",
753 	    m_stream.c_str(),addr,this);
754 	deleteSocket(sock);
755 	return 0;
756     }
757     unsigned int intervals = 0;
758     if (start) {
759 	start = Time::now() - start;
760 	if (tout > start)
761 	    intervals = (unsigned int)(tout - start) / Thread::idleUsec();
762 	// Make sure we wait for at least 1 timeout interval
763 	if (!intervals)
764 	    intervals = 1;
765     }
766     String domain;
767     if (a.host() != addr)
768 	domain << " (" << addr << ")";
769     Debug(m_engine,DebugAll,"JBConnect(%s) attempt to connect to '%s:%d'%s [%p]",
770 	m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
771     bool ok = (0 != sock->connect(a));
772     bool timeout = false;
773     // Async connect in progress
774     if (!ok && sock->inProgress()) {
775 	bool done = false;
776 	bool event = false;
777 	while (intervals && !(done || event || stop)) {
778 	    if (!sock->select(0,&done,&event,Thread::idleUsec()))
779 	        break;
780 	    intervals--;
781 	    stop = exiting(sock);
782 	}
783 	timeout = !intervals && !(done || event);
784 	if (sock && !sock->error() && (done || event) && sock->updateError())
785 	    ok = !sock->error();
786     }
787     if (ok) {
788 	Debug(m_engine,DebugAll,"JBConnect(%s) connected to '%s:%d'%s [%p]",
789 	    m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),this);
790 	return sock;
791     }
792     if (sock) {
793 	String reason;
794 	if (timeout)
795 	    reason = "Timeout";
796 	else {
797 	    String tmp;
798 	    Thread::errorString(tmp,sock->error());
799 	    reason << sock->error() << " '" << tmp << "'";
800 	}
801 	Debug(m_engine,DebugNote,"JBConnect(%s) failed to connect to '%s:%d'%s. %s [%p]",
802 	    m_stream.c_str(),a.host().c_str(),a.port(),domain.safe(),reason.safe(),this);
803 	deleteSocket(sock);
804     }
805     return 0;
806 }
807 
808 // Check if exiting. Release socket
exiting(Socket * & sock)809 bool JBConnect::exiting(Socket*& sock)
810 {
811     bool done = Thread::check(false) || !m_engine || m_engine->exiting();
812     if (done && sock)
813 	deleteSocket(sock);
814     return done;
815 }
816 
817 // Notify termination, remove from engine
terminated(Socket * sock,bool final)818 void JBConnect::terminated(Socket* sock, bool final)
819 {
820     bool done = exiting(sock);
821     JBEngine* engine = m_engine;
822     m_engine = 0;
823     // Remove from engine
824     if (engine)
825 	engine->connectStatus(this,false);
826     if (done) {
827 	if (!final && Thread::check(false))
828 	    Debug(m_engine,DebugAll,"JBConnect(%s) cancelled [%p]",m_stream.c_str(),this);
829 	return;
830     }
831     JBStream* stream = engine->findStream(m_stream,m_streamType);
832     if (!final)
833 	Debug(engine,DebugAll,"JBConnect(%s) terminated [%p]",m_stream.c_str(),this);
834     else if (stream)
835 	Debug(engine,DebugWarn,"JBConnect(%s) abnormally terminated! [%p]",
836 	    m_stream.c_str(),this);
837     // Notify stream
838     if (stream) {
839 	stream->connectTerminated(sock);
840 	TelEngine::destruct(stream);
841     }
842     else {
843 	deleteSocket(sock);
844 	DDebug(engine,DebugInfo,"JBConnect(%s) stream vanished while connecting [%p]",
845 	    m_stream.c_str(),this);
846     }
847 }
848 
849 // Notify connecting to the stream. Return false if stream vanished
notifyConnecting(bool sync,bool useCurrentStat)850 bool JBConnect::notifyConnecting(bool sync, bool useCurrentStat)
851 {
852     JBStream* stream = m_engine ? m_engine->findStream(m_stream,m_streamType) : 0;
853     if (!stream)
854 	return false;
855     int stat = m_status;
856     if (!useCurrentStat) {
857 	// Advertised state:
858 	// Srv --> Address: we'll advance the state on retry
859 	// Domain --> Start to re-start on retry
860 	if (m_status == Srv)
861 	    stat = Address;
862 	else if (m_status == Domain)
863 	    stat = Start;
864     }
865     bool ok = stream->connecting(sync,stat,m_srvs);
866     TelEngine::destruct(stream);
867     return ok;
868 }
869 
870 // Delete a socket
deleteSocket(Socket * & sock)871 void JBConnect::deleteSocket(Socket*& sock)
872 {
873     if (!sock)
874 	return;
875     sock->setReuse();
876     sock->setLinger(0);
877     delete sock;
878     sock = 0;
879 }
880 
881 // Advance the status
advanceStatus()882 void JBConnect::advanceStatus()
883 {
884     if (m_status == Start)
885 	m_status = Address;
886     else if (m_status == Address) {
887 	if (m_domain) {
888 	    if (!m_port &&
889 		(m_streamType == JBStream::c2s || m_streamType == JBStream::s2s))
890 		m_status = Srv;
891 	    else
892 		m_status = Domain;
893 	}
894 	else
895 	    m_status = Start;
896     }
897     else if (m_status == Srv)
898 	m_status = Domain;
899     else if (m_status == Domain)
900 	m_status = Start;
901     else
902 	m_status = Address;
903 }
904 
905 
906 /*
907  * JBEngine
908  */
JBEngine(const char * name)909 JBEngine::JBEngine(const char* name)
910     : Mutex(true,"JBEngine"),
911     m_exiting(false),
912     m_restartMax(JB_RESTART_COUNT), m_restartUpdInterval(JB_RESTART_UPDATE),
913     m_setupTimeout(JB_SETUP_INTERVAL), m_startTimeout(JB_START_INTERVAL),
914     m_connectTimeout(JB_CONNECT_INTERVAL), m_srvTimeout(JB_SRV_INTERVAL),
915     m_pingInterval(JB_PING_INTERVAL), m_pingTimeout(JB_PING_TIMEOUT),
916     m_idleTimeout(0), m_pptTimeoutC2s(0), m_pptTimeout(0),
917     m_streamReadBuffer(JB_STREAMBUF), m_maxIncompleteXml(XMPP_MAX_INCOMPLETEXML),
918     m_redirectMax(JB_REDIRECT_COUNT),
919     m_hasClientTls(true), m_printXml(0), m_initialized(false)
920 {
921     debugName(name);
922     XDebug(this,DebugAll,"JBEngine [%p]",this);
923 }
924 
~JBEngine()925 JBEngine::~JBEngine()
926 {
927     XDebug(this,DebugAll,"~JBEngine [%p]",this);
928 }
929 
930 // Cleanup streams. Stop all threads owned by this engine. Release memory
destruct()931 void JBEngine::destruct()
932 {
933     cleanup(true,false);
934     GenObject::destruct();
935 }
936 
937 // Initialize the engine's parameters
initialize(const NamedList & params)938 void JBEngine::initialize(const NamedList& params)
939 {
940     int lvl = params.getIntValue("debug_level",-1);
941     if (lvl != -1)
942 	debugLevel(lvl);
943     JBClientEngine* client = YOBJECT(JBClientEngine,this);
944     String tmp = params.getValue("printxml");
945     if (!tmp && client)
946 	tmp = "verbose";
947     m_printXml = tmp.toBoolean() ? -1: ((tmp == "verbose") ? 1 : 0);
948 
949     m_streamReadBuffer = fixValue(params,"stream_readbuffer",
950 	JB_STREAMBUF,JB_STREAMBUF_MIN,(unsigned int)-1);
951     m_maxIncompleteXml = fixValue(params,"stream_parsermaxbuffer",
952 	XMPP_MAX_INCOMPLETEXML,1024,(unsigned int)-1);
953     m_restartMax = fixValue(params,"stream_restartcount",
954 	JB_RESTART_COUNT,JB_RESTART_COUNT_MIN,JB_RESTART_COUNT_MAX);
955     m_restartUpdInterval = fixValue(params,"stream_restartupdateinterval",
956 	JB_RESTART_UPDATE,JB_RESTART_UPDATE_MIN,JB_RESTART_UPDATE_MAX);
957     m_setupTimeout = fixValue(params,"stream_setuptimeout",
958 	JB_SETUP_INTERVAL,JB_SETUP_INTERVAL_MIN,JB_SETUP_INTERVAL_MAX);
959     m_startTimeout = fixValue(params,"stream_starttimeout",
960 	JB_START_INTERVAL,JB_START_INTERVAL_MIN,JB_START_INTERVAL_MAX);
961     m_connectTimeout = fixValue(params,"stream_connecttimeout",
962 	JB_CONNECT_INTERVAL,JB_CONNECT_INTERVAL_MIN,JB_CONNECT_INTERVAL_MAX);
963     m_srvTimeout = fixValue(params,"stream_srvtimeout",
964 	JB_SRV_INTERVAL,JB_SRV_INTERVAL_MIN,JB_SRV_INTERVAL_MAX);
965     m_pingInterval = fixValue(params,"stream_pinginterval",
966 	client ? JB_PING_INTERVAL : 0,JB_PING_INTERVAL_MIN,JB_PING_INTERVAL_MAX,true);
967     m_pingTimeout = fixValue(params,"stream_pingtimeout",
968 	client ? JB_PING_TIMEOUT : 0,JB_PING_TIMEOUT_MIN,JB_PING_TIMEOUT_MAX,true);
969     if (!(m_pingInterval && m_pingTimeout))
970 	m_pingInterval = m_pingTimeout = 0;
971     m_idleTimeout = fixValue(params,"stream_idletimeout",
972 	JB_IDLE_INTERVAL,JB_IDLE_INTERVAL_MIN,JB_IDLE_INTERVAL_MAX);
973     int defVal = JB_REDIRECT_COUNT;
974     if (client)
975 	defVal = JB_REDIRECT_COUNT_CLIENT;
976     m_redirectMax = params.getIntValue("stream_redirectcount",
977 	defVal,JB_REDIRECT_MIN,JB_REDIRECT_MAX);
978     m_pptTimeoutC2s = params.getIntValue("stream_ppttimeout_c2s",10000,0,120000);
979     m_pptTimeout = params.getIntValue("stream_ppttimeout",60000,0,180000);
980     m_initialized = true;
981 }
982 
983 // Terminate all streams
cleanup(bool final,bool waitTerminate)984 void JBEngine::cleanup(bool final, bool waitTerminate)
985 {
986     DDebug(this,DebugAll,"JBEngine::cleanup() final=%s wait=%s",
987 	String::boolText(final),String::boolText(waitTerminate));
988     dropAll(JBStream::TypeCount,JabberID::empty(),JabberID::empty(),
989 	XMPPError::Shutdown);
990     lock();
991     ObjList* found = m_connect.skipNull();
992     if (found) {
993 	Debug(this,DebugAll,"Terminating %u stream connect threads",m_connect.count());
994 	for (ObjList* o = found; o; o = o->skipNext()) {
995 	    JBConnect* conn = static_cast<JBConnect*>(o->get());
996 	    XDebug(this,DebugAll,"Terminating connect thread (%p)",conn);
997 	    conn->stopConnect();
998 	}
999     }
1000     unlock();
1001     if (found) {
1002 	XDebug(this,DebugAll,"Waiting for stream connect threads to terminate");
1003 	while (found) {
1004 	    Thread::yield(false);
1005 	    Lock lock(this);
1006 	    found = m_connect.skipNull();
1007 	}
1008 	Debug(this,DebugAll,"Stream connect threads terminated");
1009     }
1010     stopStreamSets(waitTerminate);
1011 }
1012 
1013 // Accept an incoming stream connection. Build a stream
acceptConn(Socket * sock,SocketAddr & remote,JBStream::Type t,bool ssl)1014 bool JBEngine::acceptConn(Socket* sock, SocketAddr& remote, JBStream::Type t, bool ssl)
1015 {
1016     if (!sock)
1017 	return false;
1018     if (exiting()) {
1019 	Debug(this,DebugNote,
1020 	    "Can't accept connection from '%s:%d' type='%s': engine is exiting",
1021 	    remote.host().c_str(),remote.port(),lookup(t,JBStream::s_typeName));
1022 	return false;
1023     }
1024     if (ssl && t != JBStream::c2s) {
1025 	Debug(this,DebugNote,"SSL connection on non c2s stream");
1026 	return false;
1027     }
1028     JBStream* s = 0;
1029     if (t == JBStream::c2s)
1030 	s = new JBClientStream(this,sock,ssl);
1031     else if (t == JBStream::s2s)
1032 	s = new JBServerStream(this,sock,false);
1033     else if (t == JBStream::comp)
1034 	s = new JBServerStream(this,sock,true);
1035     else if (t == JBStream::cluster)
1036 	s = new JBClusterStream(this,sock);
1037     if (s)
1038 	addStream(s);
1039     else
1040 	Debug(this,DebugNote,"Can't accept connection from '%s:%d' type='%s'",
1041 	    remote.host().c_str(),remote.port(),lookup(t,JBStream::s_typeName));
1042     return s != 0;
1043 }
1044 
1045 // Find a stream by its name
findStream(const String & id,JBStream::Type hint)1046 JBStream* JBEngine::findStream(const String& id, JBStream::Type hint)
1047 {
1048     if (!id)
1049 	return 0;
1050     RefPointer<JBStreamSetList> list[JBStream::TypeCount];
1051     getStreamLists(list,hint);
1052     for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
1053 	if (!list[i])
1054 	    continue;
1055 	JBStream* stream = JBEngine::findStream(id,list[i]);
1056 	if (stream) {
1057 	    for (; i < JBStream::TypeCount; i++)
1058 		list[i] = 0;
1059 	    return stream;
1060 	}
1061 	list[i] = 0;
1062     }
1063     return 0;
1064 }
1065 
1066 // Find all c2s streams whose local or remote bare jid matches a given one
findClientStreams(bool in,const JabberID & jid,int flags)1067 ObjList* JBEngine::findClientStreams(bool in, const JabberID& jid, int flags)
1068 {
1069     if (!jid.node())
1070 	return 0;
1071     RefPointer<JBStreamSetList> list;
1072     getStreamList(list,JBStream::c2s);
1073     if (!list)
1074 	return 0;
1075     ObjList* result = 0;
1076     list->lock();
1077     for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1078 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1079 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1080 	    JBClientStream* stream = static_cast<JBClientStream*>(s->get());
1081 	    // Ignore destroying streams
1082 	    if (stream->incoming() != in || stream->state() == JBStream::Destroy)
1083 		continue;
1084 	    Lock lock(stream);
1085 	    const JabberID& sid = in ? stream->remote() : stream->local();
1086 	    if (sid.bare() == jid.bare() && stream->flag(flags) && stream->ref()) {
1087 		if (!result)
1088 		    result = new ObjList;
1089 		result->append(stream);
1090 	    }
1091 	}
1092     }
1093     list->unlock();
1094     list = 0;
1095     return result;
1096 }
1097 
1098 // Find all c2s streams whose local or remote bare jid matches a given one and
1099 //  their resource is found in the given list
findClientStreams(bool in,const JabberID & jid,const ObjList & resources,int flags)1100 ObjList* JBEngine::findClientStreams(bool in, const JabberID& jid, const ObjList& resources,
1101     int flags)
1102 {
1103     if (!jid.node())
1104 	return 0;
1105     RefPointer<JBStreamSetList> list;
1106     getStreamList(list,JBStream::c2s);
1107     if (!list)
1108 	return 0;
1109     ObjList* result = 0;
1110     list->lock();
1111     for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1112 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1113 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1114 	    JBClientStream* stream = static_cast<JBClientStream*>(s->get());
1115 	    // Ignore destroying streams
1116 	    if (stream->incoming() != in || stream->state() == JBStream::Destroy)
1117 		continue;
1118 	    Lock lock(stream);
1119 	    const JabberID& sid = in ? stream->remote() : stream->local();
1120 	    if (sid.bare() == jid.bare() && resources.find(sid.resource()) &&
1121 		stream->flag(flags) && stream->ref()) {
1122 		if (!result)
1123 		    result = new ObjList;
1124 		result->append(stream);
1125 	    }
1126 	}
1127     }
1128     list->unlock();
1129     list = 0;
1130     return result;
1131 }
1132 
1133 // Find a c2s stream by its local or remote jid
findClientStream(bool in,const JabberID & jid)1134 JBClientStream* JBEngine::findClientStream(bool in, const JabberID& jid)
1135 {
1136     if (!jid.node())
1137 	return 0;
1138     RefPointer<JBStreamSetList> list;
1139     getStreamList(list,JBStream::c2s);
1140     if (!list)
1141 	return 0;
1142     JBClientStream* found = 0;
1143     list->lock();
1144     for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1145 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1146 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1147 	    found = static_cast<JBClientStream*>(s->get());
1148 	    // Ignore destroying streams
1149 	    if (found->incoming() != in || found->state() == JBStream::Destroy)
1150 		continue;
1151 	    Lock lock(found);
1152 	    const JabberID& sid = in ? found->remote() : found->local();
1153 	    if (sid == jid && found->ref())
1154 		break;
1155 	    found = 0;
1156 	}
1157 	if (found)
1158 	    break;
1159     }
1160     list->unlock();
1161     list = 0;
1162     return found;
1163 }
1164 
1165 // Terminate all streams matching type and/or local/remote jid
dropAll(JBStream::Type type,const JabberID & local,const JabberID & remote,XMPPError::Type error,const char * reason)1166 unsigned int JBEngine::dropAll(JBStream::Type type, const JabberID& local,
1167     const JabberID& remote, XMPPError::Type error, const char* reason)
1168 {
1169     XDebug(this,DebugInfo,"dropAll(%s,%s,%s,%s,%s)",lookup(type,JBStream::s_typeName),
1170 	local.c_str(),remote.c_str(),XMPPUtils::s_error[error].c_str(),reason);
1171     RefPointer<JBStreamSetList> list[JBStream::TypeCount];
1172     getStreamLists(list,type);
1173     unsigned int n = 0;
1174     for (unsigned int i = 0; i < JBStream::TypeCount; i++) {
1175 	if (!list[i])
1176 	    continue;
1177 	list[i]->lock();
1178 	for (ObjList* o = list[i]->sets().skipNull(); o; o = o->skipNext()) {
1179 	    JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1180 	    n += set->dropAll(local,remote,error,reason);
1181 	}
1182 	list[i]->unlock();
1183 	list[i] = 0;
1184     }
1185     DDebug(this,DebugInfo,
1186 	"Dropped %u streams type=%s local=%s remote=%s error=%s reason=%s",
1187 	n,lookup(type,JBStream::s_typeName),local.c_str(),remote.c_str(),
1188 	XMPPUtils::s_error[error].c_str(),reason);
1189     return n;
1190 }
1191 
1192 // Process an event
processEvent(JBEvent * ev)1193 void JBEngine::processEvent(JBEvent* ev)
1194 {
1195     Debug(this,DebugStub,"JBEngine::processEvent() not implemented!");
1196     returnEvent(ev);
1197 }
1198 
1199 // Return an event to this engine
returnEvent(JBEvent * ev,XMPPError::Type error,const char * reason)1200 void JBEngine::returnEvent(JBEvent* ev, XMPPError::Type error, const char* reason)
1201 {
1202     if (!ev)
1203 	return;
1204     // Send error when supported
1205     if (error != XMPPError::NoError)
1206 	ev->sendStanzaError(error,reason);
1207     XDebug(this,DebugAll,"Deleting returned event (%p,%s)",ev,ev->name());
1208     TelEngine::destruct(ev);
1209 }
1210 
1211 // Start stream TLS
encryptStream(JBStream * stream)1212 void JBEngine::encryptStream(JBStream* stream)
1213 {
1214     Debug(this,DebugStub,"JBEngine::encryptStream() not implemented!");
1215 }
1216 
1217 // Connect an outgoing stream
connectStream(JBStream * stream)1218 void JBEngine::connectStream(JBStream* stream)
1219 {
1220     Debug(this,DebugStub,"JBEngine::connectStream() not implemented!");
1221 }
1222 
1223 // Start stream compression
compressStream(JBStream * stream,const String & formats)1224 void JBEngine::compressStream(JBStream* stream, const String& formats)
1225 {
1226     Debug(this,DebugStub,"JBEngine::compressStream() not implemented!");
1227 }
1228 
1229 // Build a dialback key
buildDialbackKey(const String & id,const String & local,const String & remote,String & key)1230 void JBEngine::buildDialbackKey(const String& id, const String& local,
1231     const String& remote, String& key)
1232 {
1233     Debug(this,DebugStub,"JBEngine::buildDialbackKey() not implemented!");
1234 }
1235 
1236 // Check for duplicate stream id at a remote server
checkDupId(JBStream * stream)1237 bool JBEngine::checkDupId(JBStream* stream)
1238 {
1239     if (!stream || stream->incoming())
1240 	return false;
1241     RefPointer<JBStreamSetList> list;
1242     getStreamList(list,stream->type());
1243     if (!list)
1244 	return false;
1245     stream->lock();
1246     String domain = stream->remote().domain();
1247     String id = stream->id();
1248     stream->unlock();
1249     list->lock();
1250     JBStream* found = 0;
1251     for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1252 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1253 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1254 	    found = static_cast<JBStream*>(s->get());
1255 	    if (found != stream && found->outgoing()) {
1256 		// Lock the stream: its data might change
1257 		Lock lock(found);
1258 		// Ignore destroying streams
1259 		if (found->remote().domain() == domain &&
1260 		    found->id() == id && found->state() != JBStream::Destroy)
1261 		    break;
1262 	    }
1263 	    found = 0;
1264 	}
1265 	if (found)
1266 	    break;
1267     }
1268     list->unlock();
1269     list = 0;
1270     return found != 0;
1271 }
1272 
1273 // Print XML to output
printXml(const JBStream * stream,bool send,XmlChild & xml) const1274 void JBEngine::printXml(const JBStream* stream, bool send, XmlChild& xml) const
1275 {
1276     if (!(m_printXml && debugAt(DebugInfo)))
1277 	return;
1278     String s;
1279     if (m_printXml > 0)
1280 	s << "\r\n-----";
1281     XMPPUtils::print(s,xml,m_printXml > 0);
1282     if (m_printXml > 0)
1283 	s << "\r\n-----";
1284     const char* dir = send ? "Sending to" : "Receiving from";
1285     if (m_printXml < 0)
1286 	Debug(stream,DebugInfo,"%s '%s' %s [%p]",dir,stream->remote().c_str(),s.c_str(),stream);
1287     else
1288 	Debug(stream,DebugInfo,"%s '%s' [%p]%s",dir,stream->remote().c_str(),stream,s.c_str());
1289 }
1290 
1291 // Print an XML fragment to output
printXml(const JBStream * stream,bool send,XmlFragment & frag) const1292 void JBEngine::printXml(const JBStream* stream, bool send, XmlFragment& frag) const
1293 {
1294     if (!(m_printXml && debugAt(DebugInfo)))
1295 	return;
1296     String s;
1297     if (m_printXml > 0)
1298 	s << "\r\n-----";
1299     for (ObjList* o = frag.getChildren().skipNull(); o; o = o->skipNext())
1300 	XMPPUtils::print(s,*static_cast<XmlChild*>(o->get()),m_printXml > 0);
1301     if (m_printXml > 0)
1302 	s << "\r\n-----";
1303     const char* dir = send ? "Sending to" : "Receiving from";
1304     if (m_printXml < 0)
1305 	Debug(stream,DebugInfo,"%s '%s' %s [%p]",dir,stream->remote().c_str(),s.c_str(),stream);
1306     else
1307 	Debug(stream,DebugInfo,"%s '%s' [%p]%s",dir,stream->remote().c_str(),stream,s.c_str());
1308 }
1309 
1310 // Add a stream to one of the stream lists
addStream(JBStream * stream)1311 void JBEngine::addStream(JBStream* stream)
1312 {
1313     Debug(this,DebugStub,"JBEngine::addStream() not implemented!");
1314 }
1315 
1316 // Remove a stream
removeStream(JBStream * stream,bool delObj)1317 void JBEngine::removeStream(JBStream* stream, bool delObj)
1318 {
1319     if (!stream)
1320 	return;
1321     stopConnect(stream->toString());
1322 }
1323 
1324 // Add/remove a connect stream thread when started/stopped
connectStatus(JBConnect * conn,bool started)1325 void JBEngine::connectStatus(JBConnect* conn, bool started)
1326 {
1327     if (!conn)
1328 	return;
1329     Lock lock(this);
1330     if (started) {
1331 	// Make sure we remove any existing connect stream with the same name
1332 	stopConnect(conn->toString());
1333 	m_connect.append(conn)->setDelete(false);
1334 	DDebug(this,DebugAll,"Added stream connect thread (%p)",conn);
1335     }
1336     else {
1337 	GenObject* o = m_connect.remove(conn,false);
1338 	if (o)
1339 	    DDebug(this,DebugAll,"Removed stream connect thread (%p)",conn);
1340     }
1341 }
1342 
1343 // Stop a connect stream
stopConnect(const String & name)1344 void JBEngine::stopConnect(const String& name)
1345 {
1346     Lock lock(this);
1347     ObjList* o = m_connect.find(name);
1348     if (!o)
1349 	return;
1350     JBConnect* conn = static_cast<JBConnect*>(o->get());
1351     Debug(this,DebugAll,"Stopping stream connect thread (%p,%s)",conn,name.c_str());
1352     conn->stopConnect();
1353     o->remove(false);
1354 }
1355 
1356 // Find a stream by its name in a given set list
findStream(const String & id,JBStreamSetList * list)1357 JBStream* JBEngine::findStream(const String& id, JBStreamSetList* list)
1358 {
1359     if (!list)
1360 	return 0;
1361     Lock lock(list);
1362     ObjList* found = 0;
1363     for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
1364 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1365 	found = set->clients().find(id);
1366     }
1367     JBStream* stream = found ? static_cast<JBStream*>(found->get()) : 0;
1368     if (stream && !stream->ref())
1369 	stream = 0;
1370     return stream;
1371 }
1372 
1373 
1374 /*
1375  * JBServerEngine
1376  */
JBServerEngine(const char * name)1377 JBServerEngine::JBServerEngine(const char* name)
1378     : JBEngine(name),
1379     m_streamIndex(0),
1380     m_c2sReceive(0), m_c2sProcess(0), m_s2sReceive(0), m_s2sProcess(0),
1381     m_compReceive(0), m_compProcess(0), m_clusterReceive(0), m_clusterProcess(0)
1382 {
1383 }
1384 
~JBServerEngine()1385 JBServerEngine::~JBServerEngine()
1386 {
1387 }
1388 
1389 // Terminate all streams
cleanup(bool final,bool waitTerminate)1390 void JBServerEngine::cleanup(bool final, bool waitTerminate)
1391 {
1392     JBEngine::cleanup(final,waitTerminate);
1393     DDebug(this,DebugAll,"JBServerEngine::cleanup() final=%s wait=%s",
1394 	String::boolText(final),String::boolText(waitTerminate));
1395     if (!final)
1396 	return;
1397     Lock lock(this);
1398     TelEngine::destruct(m_c2sReceive);
1399     TelEngine::destruct(m_c2sProcess);
1400     TelEngine::destruct(m_s2sReceive);
1401     TelEngine::destruct(m_s2sProcess);
1402     TelEngine::destruct(m_compReceive);
1403     TelEngine::destruct(m_compProcess);
1404     TelEngine::destruct(m_clusterReceive);
1405     TelEngine::destruct(m_clusterProcess);
1406 }
1407 
1408 // Stop all stream sets
stopStreamSets(bool waitTerminate)1409 void JBServerEngine::stopStreamSets(bool waitTerminate)
1410 {
1411     XDebug(this,DebugAll,"JBServerEngine::stopStreamSets() wait=%s",
1412 	String::boolText(waitTerminate));
1413     lock();
1414     RefPointer<JBStreamSetList> sets[] = {m_c2sReceive,m_c2sProcess,
1415 	m_s2sReceive,m_s2sProcess,m_compReceive,m_compProcess,
1416 	m_clusterReceive,m_clusterProcess};
1417     unlock();
1418     int n = 2 * JBStream::TypeCount;
1419     for (int i = 0; i < n; i++)
1420 	if (sets[i])
1421 	    sets[i]->stop(0,waitTerminate);
1422     for (int j = 0; j < n; j++)
1423 	sets[j] = 0;
1424 }
1425 
1426 // Retrieve the list of streams of a given type
getStreamList(RefPointer<JBStreamSetList> & list,int type)1427 void JBServerEngine::getStreamList(RefPointer<JBStreamSetList>& list, int type)
1428 {
1429     Lock lock(this);
1430     if (type == JBStream::c2s)
1431 	list = m_c2sReceive;
1432     else if (type == JBStream::s2s)
1433 	list = m_s2sReceive;
1434     else if (type == JBStream::comp)
1435 	list = m_compReceive;
1436     else if (type == JBStream::cluster)
1437 	list = m_clusterReceive;
1438 }
1439 
1440 // Retrieve the stream lists of a given type
getStreamListsType(int type,RefPointer<JBStreamSetList> & recv,RefPointer<JBStreamSetList> & process)1441 void JBServerEngine::getStreamListsType(int type, RefPointer<JBStreamSetList>& recv,
1442     RefPointer<JBStreamSetList>& process)
1443 {
1444     if (type == JBStream::c2s) {
1445 	recv = m_c2sReceive;
1446 	process = m_c2sProcess;
1447     }
1448     else if (type == JBStream::s2s) {
1449 	recv = m_s2sReceive;
1450 	process = m_s2sProcess;
1451     }
1452     else if (type == JBStream::comp) {
1453 	recv = m_compReceive;
1454 	process = m_compProcess;
1455     }
1456     else if (type == JBStream::cluster) {
1457 	recv = m_clusterReceive;
1458 	process = m_clusterProcess;
1459     }
1460 }
1461 
1462 // Find a server to server or component stream by local/remote domain.
1463 // Skip over outgoing dialback streams
findServerStream(const String & local,const String & remote,bool out,bool auth)1464 JBServerStream* JBServerEngine::findServerStream(const String& local, const String& remote,
1465     bool out, bool auth)
1466 {
1467     if (!(local && remote))
1468 	return 0;
1469     lock();
1470     RefPointer<JBStreamSetList> list[2] = {m_s2sReceive,m_compReceive};
1471     unlock();
1472     JBServerStream* stream = 0;
1473     for (int i = 0; i < 2; i++) {
1474 	list[i]->lock();
1475 	for (ObjList* o = list[i]->sets().skipNull(); o; o = o->skipNext()) {
1476 	    JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1477 	    for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1478 		stream = static_cast<JBServerStream*>(s->get());
1479 		if (stream->type() == JBStream::comp ||
1480 		    (out == stream->outgoing() && !stream->dialback())) {
1481 		    // Lock the stream: remote jid might change
1482 		    Lock lock(stream);
1483 		    if (local != stream->local()) {
1484 			stream = 0;
1485 			continue;
1486 		    }
1487 		    bool checkRemote = out || stream->type() == JBStream::comp;
1488 		    if ((checkRemote && remote == stream->remote()) ||
1489 			(!checkRemote && stream->hasRemoteDomain(remote,auth))) {
1490 			stream->ref();
1491 			break;
1492 		    }
1493 		}
1494 		stream = 0;
1495 	    }
1496 	    if (stream)
1497 		break;
1498 	}
1499 	list[i]->unlock();
1500 	if (stream)
1501 	    break;
1502     }
1503     list[0] = list[1] = 0;
1504     return stream;
1505 }
1506 
1507 // Create an outgoing s2s stream.
createServerStream(const String & local,const String & remote,const char * dbId,const char * dbKey,bool dbOnly,const NamedList * params)1508 JBServerStream* JBServerEngine::createServerStream(const String& local,
1509     const String& remote, const char* dbId, const char* dbKey, bool dbOnly,
1510     const NamedList* params)
1511 {
1512     if (exiting()) {
1513 	Debug(this,DebugAll,"Can't create s2s local=%s remote=%s: engine is exiting",
1514 	    local.c_str(),remote.c_str());
1515 	return 0;
1516     }
1517     JBServerStream* stream = 0;
1518     if (!dbOnly)
1519 	stream = findServerStream(local,remote,true);
1520     if (!stream) {
1521 	stream = new JBServerStream(this,local,remote,dbId,dbKey,dbOnly,params);
1522 	stream->ref();
1523 	addStream(stream);
1524     }
1525     else
1526 	TelEngine::destruct(stream);
1527     return stream;
1528 }
1529 
1530 // Create an outgoing comp stream
createCompStream(const String & name,const String & local,const String & remote,const NamedList * params)1531 JBServerStream* JBServerEngine::createCompStream(const String& name, const String& local,
1532     const String& remote, const NamedList* params)
1533 {
1534     if (exiting()) {
1535 	Debug(this,DebugAll,"Can't create comp local=%s remote=%s: engine is exiting",
1536 	    local.c_str(),remote.c_str());
1537 	return 0;
1538     }
1539     JBServerStream* stream = findServerStream(local,remote,true);
1540     if (!stream) {
1541 	stream = new JBServerStream(this,local,remote,&name,params);
1542 	stream->ref();
1543 	addStream(stream);
1544     }
1545     return stream;
1546 }
1547 
1548 // Find a cluster stream by remote domain
findClusterStream(const String & remote,JBClusterStream * skip)1549 JBClusterStream* JBServerEngine::findClusterStream(const String& remote,
1550     JBClusterStream* skip)
1551 {
1552     if (!remote)
1553 	return 0;
1554     lock();
1555     RefPointer<JBStreamSetList> list = m_clusterReceive;
1556     unlock();
1557     JBClusterStream* stream = 0;
1558     list->lock();
1559     for (ObjList* o = list->sets().skipNull(); o; o = o->skipNext()) {
1560 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1561 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1562 	    stream = static_cast<JBClusterStream*>(s->get());
1563 	    if (skip != stream) {
1564 		Lock lock(stream);
1565 		if (stream->state() != JBStream::Destroy &&
1566 		    remote == stream->remote()) {
1567 		    stream->ref();
1568 		    break;
1569 		}
1570 	    }
1571 	    stream = 0;
1572 	}
1573     }
1574     list->unlock();
1575     list = 0;
1576     return stream;
1577 }
1578 
1579 // Create an outgoing cluster stream
createClusterStream(const String & local,const String & remote,const NamedList * params)1580 JBClusterStream* JBServerEngine::createClusterStream(const String& local,
1581     const String& remote, const NamedList* params)
1582 {
1583     if (exiting()) {
1584 	Debug(this,DebugAll,"Can't create cluster local=%s remote=%s: engine is exiting",
1585 	    local.c_str(),remote.c_str());
1586 	return 0;
1587     }
1588     JBClusterStream* stream = findClusterStream(remote);
1589     if (!stream) {
1590 	stream = new JBClusterStream(this,local,remote,params);
1591 	stream->ref();
1592 	addStream(stream);
1593     }
1594     return stream;
1595 }
1596 
1597 // Terminate all incoming c2s streams matching a given JID
terminateClientStreams(const JabberID & jid,XMPPError::Type error,const char * reason)1598 unsigned int JBServerEngine::terminateClientStreams(const JabberID& jid,
1599     XMPPError::Type error, const char* reason)
1600 {
1601     unsigned int n = 0;
1602     ObjList* list = findClientStreams(true,jid);
1603     if (!list)
1604 	return 0;
1605     n = list->count();
1606     DDebug(this,DebugInfo,"Terminating %u incoming c2s streams jid=%s error=%s reason=%s",
1607 	n,jid.bare().c_str(),XMPPUtils::s_tag[error].c_str(),reason);
1608     for (ObjList* o = list->skipNull(); o; o = o->skipNext()) {
1609 	JBClientStream* stream = static_cast<JBClientStream*>(o->get());
1610 	stream->terminate(-1,true,0,error,reason);
1611     }
1612     TelEngine::destruct(list);
1613     return n;
1614 }
1615 
1616 // Add a stream to one of the stream lists
addStream(JBStream * stream)1617 void JBServerEngine::addStream(JBStream* stream)
1618 {
1619     if (!stream)
1620 	return;
1621     lock();
1622     RefPointer<JBStreamSetList> recv;
1623     RefPointer<JBStreamSetList> process;
1624     getStreamListsType(stream->type(),recv,process);
1625     unlock();
1626     if (recv && process) {
1627 	recv->add(stream);
1628 	process->add(stream);
1629     }
1630     else
1631 	DDebug(this,DebugStub,"JBServerEngine::addStream() type='%s' not handled!",
1632 	    stream->typeName());
1633     recv = 0;
1634     process = 0;
1635     TelEngine::destruct(stream);
1636 }
1637 
1638 // Remove a stream
removeStream(JBStream * stream,bool delObj)1639 void JBServerEngine::removeStream(JBStream* stream, bool delObj)
1640 {
1641     if (!stream)
1642 	return;
1643     JBEngine::removeStream(stream,delObj);
1644     lock();
1645     DDebug(this,DebugAll,"JBServerEngine::removeStream(%p,%u) id=%s",
1646 	stream,delObj,stream->toString().c_str());
1647     RefPointer<JBStreamSetList> recv;
1648     RefPointer<JBStreamSetList> process;
1649     getStreamListsType(stream->type(),recv,process);
1650     unlock();
1651     if (recv)
1652 	recv->remove(stream,delObj);
1653     if (process)
1654 	process->remove(stream,delObj);
1655     recv = 0;
1656     process = 0;
1657 }
1658 
1659 
1660 /*
1661  * JBClientEngine
1662  */
JBClientEngine(const char * name)1663 JBClientEngine::JBClientEngine(const char* name)
1664     : JBEngine(name),
1665     m_receive(0), m_process(0)
1666 {
1667 }
1668 
~JBClientEngine()1669 JBClientEngine::~JBClientEngine()
1670 {
1671 }
1672 
1673 // Terminate all streams
cleanup(bool final,bool waitTerminate)1674 void JBClientEngine::cleanup(bool final, bool waitTerminate)
1675 {
1676     JBEngine::cleanup(final,waitTerminate);
1677     DDebug(this,DebugAll,"JBClientEngine::cleanup() final=%s wait=%s",
1678 	String::boolText(final),String::boolText(waitTerminate));
1679     if (!final)
1680 	return;
1681     Lock lock(this);
1682     TelEngine::destruct(m_receive);
1683     TelEngine::destruct(m_process);
1684 }
1685 
1686 // Find a stream by account
findAccount(const String & account)1687 JBClientStream* JBClientEngine::findAccount(const String& account)
1688 {
1689     if (!account)
1690 	return 0;
1691     RefPointer<JBStreamSetList> list;
1692     getStreamList(list,JBStream::c2s);
1693     if (!list)
1694 	return 0;
1695     JBClientStream* found = 0;
1696     list->lock();
1697     for (ObjList* o = list->sets().skipNull(); !found && o; o = o->skipNext()) {
1698 	JBStreamSet* set = static_cast<JBStreamSet*>(o->get());
1699 	for (ObjList* s = set->clients().skipNull(); s; s = s->skipNext()) {
1700 	    found = static_cast<JBClientStream*>(s->get());
1701 	    if (account == found->account())
1702 		break;
1703 	    found = 0;
1704 	}
1705     }
1706     if (found && !found->ref())
1707 	found = 0;
1708     list->unlock();
1709     list = 0;
1710     return found;
1711 }
1712 
1713 // Build an outgoing client stream
create(const String & account,const NamedList & params,const String & name)1714 JBClientStream* JBClientEngine::create(const String& account, const NamedList& params,
1715     const String& name)
1716 {
1717     if (!account)
1718 	return 0;
1719     String serverHost;
1720     String username = params.getValue("username");
1721     String domain = params.getValue("domain");
1722     int pos = username.find("@");
1723     if (pos > 0) {
1724 	serverHost = domain;
1725 	domain = username.substr(pos + 1);
1726 	username = username.substr(0,pos);
1727     }
1728     if (!domain)
1729 	domain = params.getValue("server",params.getValue("address"));
1730     JabberID jid(username,domain,params.getValue("resource"));
1731     if (!jid.bare()) {
1732 	Debug(this,DebugNote,"Can't create client stream: invalid jid=%s",jid.bare().c_str());
1733 	return 0;
1734     }
1735     Lock lock(this);
1736     JBClientStream* stream = static_cast<JBClientStream*>(findAccount(account));
1737     if (!stream) {
1738 	stream = new JBClientStream(this,jid,account,params,name,serverHost);
1739 	stream->ref();
1740 	addStream(stream);
1741     }
1742     else
1743 	TelEngine::destruct(stream);
1744     return stream;
1745 }
1746 
1747 // Add a stream to one of the stream lists
addStream(JBStream * stream)1748 void JBClientEngine::addStream(JBStream* stream)
1749 {
1750     if (!stream)
1751 	return;
1752     lock();
1753     RefPointer<JBStreamSetList> recv = 0;
1754     RefPointer<JBStreamSetList> process = 0;
1755     if (stream->type() == JBStream::c2s) {
1756 	recv = m_receive;
1757 	process = m_process;
1758     }
1759     unlock();
1760     if (recv && process) {
1761 	recv->add(stream);
1762 	process->add(stream);
1763     }
1764     else
1765 	DDebug(this,DebugStub,"JBClientEngine::addStream() type='%s' not handled!",
1766 	    stream->typeName());
1767     recv = 0;
1768     process = 0;
1769     TelEngine::destruct(stream);
1770 }
1771 
1772 // Remove a stream
removeStream(JBStream * stream,bool delObj)1773 void JBClientEngine::removeStream(JBStream* stream, bool delObj)
1774 {
1775     if (!stream)
1776 	return;
1777     JBEngine::removeStream(stream,delObj);
1778     lock();
1779     DDebug(this,DebugAll,"JBClientEngine::removeStream(%p,%u) id=%s",
1780 	stream,delObj,stream->toString().c_str());
1781     RefPointer<JBStreamSetList> recv;
1782     RefPointer<JBStreamSetList> process;
1783     if (stream->type() == JBStream::c2s) {
1784 	recv = m_receive;
1785 	process = m_process;
1786     }
1787     unlock();
1788     if (recv)
1789 	recv->remove(stream,delObj);
1790     if (process)
1791 	process->remove(stream,delObj);
1792     recv = 0;
1793     process = 0;
1794 }
1795 
1796 // Stop all stream sets
stopStreamSets(bool waitTerminate)1797 void JBClientEngine::stopStreamSets(bool waitTerminate)
1798 {
1799     XDebug(this,DebugAll,"JBClientEngine::stopStreamSets() wait=%s",
1800 	String::boolText(waitTerminate));
1801     lock();
1802     RefPointer<JBStreamSetList> receive = m_receive;
1803     RefPointer<JBStreamSetList> process = m_process;
1804     unlock();
1805     if (receive)
1806 	receive->stop(0,waitTerminate);
1807     if (process)
1808 	process->stop(0,waitTerminate);
1809     receive = 0;
1810     process = 0;
1811 }
1812 
1813 // Retrieve the list of streams of a given type
getStreamList(RefPointer<JBStreamSetList> & list,int type)1814 void JBClientEngine::getStreamList(RefPointer<JBStreamSetList>& list, int type)
1815 {
1816     if (type != JBStream::c2s)
1817 	return;
1818     Lock lock(this);
1819     list = m_receive;
1820 }
1821 
1822 
1823 /*
1824  * JBEvent
1825  */
~JBEvent()1826 JBEvent::~JBEvent()
1827 {
1828     releaseStream(true);
1829     releaseXml(true);
1830     XDebug(DebugAll,"JBEvent::~JBEvent [%p]",this);
1831 }
1832 
1833 // Get a client stream from the event's stream
clientStream()1834 JBClientStream* JBEvent::clientStream()
1835 {
1836     return m_stream ? m_stream->clientStream() : 0;
1837 }
1838 
1839 // Get a server stream from the event's stream
serverStream()1840 JBServerStream* JBEvent::serverStream()
1841 {
1842     return m_stream ? m_stream->serverStream() : 0;
1843 }
1844 
clusterStream()1845 JBClusterStream* JBEvent::clusterStream()
1846 {
1847     return m_stream ? m_stream->clusterStream() : 0;
1848 }
1849 
1850 // Delete the underlying XmlElement(s). Release the ownership.
releaseXml(bool del)1851 XmlElement* JBEvent::releaseXml(bool del)
1852 {
1853     m_child = 0;
1854     if (del) {
1855 	TelEngine::destruct(m_element);
1856 	return 0;
1857     }
1858     XmlElement* tmp = m_element;
1859     m_element = 0;
1860     return tmp;
1861 }
1862 
releaseStream(bool release)1863 void JBEvent::releaseStream(bool release)
1864 {
1865     if (m_link && m_stream) {
1866 	m_stream->eventTerminated(this);
1867 	m_link = false;
1868     }
1869     if (release)
1870 	TelEngine::destruct(m_stream);
1871 }
1872 
1873 // Build an 'iq' result stanza from event data
buildIqResult(bool addTags,XmlElement * child)1874 XmlElement* JBEvent::buildIqResult(bool addTags, XmlElement* child)
1875 {
1876     XmlElement* xml = 0;
1877     if (addTags)
1878 	xml = XMPPUtils::createIqResult(m_to,m_from,m_id,child);
1879     else
1880 	xml = XMPPUtils::createIqResult(0,0,m_id,child);
1881     return xml;
1882 }
1883 
1884 // Build and send a stanza 'result' from enclosed 'iq' element
sendIqResult(XmlElement * child)1885 bool JBEvent::sendIqResult(XmlElement* child)
1886 {
1887     if (!(m_element && m_stream && XMPPUtils::isUnprefTag(*m_element,XmlTag::Iq))) {
1888 	TelEngine::destruct(child);
1889 	return false;
1890     }
1891     if (m_stanzaType == "error" || m_stanzaType == "result") {
1892 	TelEngine::destruct(child);
1893 	return false;
1894     }
1895     XmlElement* xml = buildIqResult(true,child);
1896     bool ok = m_stream->state() == JBStream::Running ?
1897 	m_stream->sendStanza(xml) : m_stream->sendStreamXml(m_stream->state(),xml);
1898     if (ok) {
1899 	releaseXml(true);
1900 	return true;
1901     }
1902     return false;
1903 }
1904 
1905 // Build an 'iq' error stanza from event data
buildIqError(bool addTags,XMPPError::Type error,const char * reason,XMPPError::ErrorType type)1906 XmlElement* JBEvent::buildIqError(bool addTags, XMPPError::Type error, const char* reason,
1907     XMPPError::ErrorType type)
1908 {
1909     XmlElement* xml = 0;
1910     if (addTags)
1911 	xml = XMPPUtils::createIq(XMPPUtils::IqError,m_to,m_from,m_id);
1912     else
1913 	xml = XMPPUtils::createIq(XMPPUtils::IqError,0,0,m_id);
1914     if (!m_id)
1915 	xml->addChild(releaseXml());
1916     xml->addChild(XMPPUtils::createError(type,error,reason));
1917     return xml;
1918 }
1919 
1920 // Build and send a stanza error from enclosed element
sendStanzaError(XMPPError::Type error,const char * reason,XMPPError::ErrorType type)1921 bool JBEvent::sendStanzaError(XMPPError::Type error, const char* reason,
1922     XMPPError::ErrorType type)
1923 {
1924     if (!(m_element && m_stream && XMPPUtils::isStanza(*m_element)))
1925 	return false;
1926     if (m_stanzaType == "error" || m_stanzaType == "result")
1927 	return false;
1928     XmlElement* xml = new XmlElement(m_element->toString());
1929     xml->setAttributeValid("from",m_to);
1930     xml->setAttributeValid("to",m_from);
1931     xml->setAttributeValid("id",m_id);
1932     xml->setAttribute("type","error");
1933     xml->addChild(XMPPUtils::createError(type,error,reason));
1934     bool ok = m_stream->state() == JBStream::Running ?
1935 	m_stream->sendStanza(xml) : m_stream->sendStreamXml(m_stream->state(),xml);
1936     if (ok) {
1937 	releaseXml(true);
1938 	return true;
1939     }
1940     return false;
1941 }
1942 
init(JBStream * stream,XmlElement * element,const JabberID * from,const JabberID * to)1943 bool JBEvent::init(JBStream* stream, XmlElement* element,
1944     const JabberID* from, const JabberID* to)
1945 {
1946     bool bRet = true;
1947     if (stream && stream->ref())
1948 	m_stream = stream;
1949     else
1950 	bRet = false;
1951     m_element = element;
1952     if (from)
1953 	m_from = *from;
1954     if (to)
1955 	m_to = *to;
1956     XDebug(DebugAll,"JBEvent::init type=%s stream=(%p) xml=(%p) [%p]",
1957 	name(),m_stream,m_element,this);
1958     if (!m_element)
1959 	return bRet;
1960 
1961     // Most elements have these parameters:
1962     m_stanzaType = m_element->getAttribute("type");
1963     if (!from)
1964 	m_from = m_element->getAttribute("from");
1965     if (!to)
1966 	m_to = m_element->getAttribute("to");
1967     m_id = m_element->getAttribute("id");
1968 
1969     // Decode some data
1970     int t = XMPPUtils::tag(*m_element);
1971     switch (t) {
1972 	case XmlTag::Message:
1973 	    if (m_stanzaType != "error")
1974 		m_text = XMPPUtils::body(*m_element);
1975 	    else
1976 		XMPPUtils::decodeError(m_element,m_text,m_text);
1977 	    break;
1978 	case XmlTag::Iq:
1979 	case XmlTag::Presence:
1980 	    if (m_stanzaType != "error")
1981 		break;
1982 	default:
1983 	    XMPPUtils::decodeError(m_element,m_text,m_text);
1984     }
1985     return bRet;
1986 }
1987 
1988 
1989 /*
1990  * JBStreamSet
1991  */
1992 // Constructor
JBStreamSet(JBStreamSetList * owner)1993 JBStreamSet::JBStreamSet(JBStreamSetList* owner)
1994     : Mutex(true,"JBStreamSet"),
1995     m_changed(false), m_exiting(false), m_owner(owner)
1996 {
1997     XDebug(m_owner->engine(),DebugAll,"JBStreamSet::JBStreamSet(%s) [%p]",
1998 	m_owner->toString().c_str(),this);
1999 }
2000 
2001 // Remove from owner
~JBStreamSet()2002 JBStreamSet::~JBStreamSet()
2003 {
2004     if (m_clients.skipNull())
2005 	Debug(m_owner->engine(),DebugCrit,
2006 	    "JBStreamSet(%s) destroyed while owning %u streams [%p]",
2007 	    m_owner->toString().c_str(),m_clients.count(),this);
2008     m_owner->remove(this);
2009     XDebug(m_owner->engine(),DebugAll,"JBStreamSet::~JBStreamSet(%s) [%p]",
2010 	m_owner->toString().c_str(),this);
2011 }
2012 
2013 // Add a stream to the set. The stream's reference counter will be increased
add(JBStream * client)2014 bool JBStreamSet::add(JBStream* client)
2015 {
2016     if (!client)
2017 	return false;
2018     Lock lock(this);
2019     if (m_exiting || (m_owner->maxStreams() && m_clients.count() >= m_owner->maxStreams()) ||
2020 	!client->ref())
2021 	return false;
2022     m_clients.append(client);
2023     m_changed = true;
2024     DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) added (%p,'%s') type=%s [%p]",
2025 	m_owner->toString().c_str(),client,client->name(),client->typeName(),this);
2026     return true;
2027 }
2028 
2029 // Remove a stream from set
remove(JBStream * client,bool delObj)2030 bool JBStreamSet::remove(JBStream* client, bool delObj)
2031 {
2032     if (!client)
2033 	return false;
2034     Lock lock(this);
2035     ObjList* o = m_clients.find(client);
2036     if (!o)
2037 	return false;
2038     DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) removing (%p,'%s') delObj=%u [%p]",
2039 	m_owner->toString().c_str(),client,client->name(),delObj,this);
2040     o->remove(delObj);
2041     m_changed = true;
2042     return true;
2043 }
2044 
2045 // Terminate all streams matching and/or local/remote jid
dropAll(const JabberID & local,const JabberID & remote,XMPPError::Type error,const char * reason)2046 unsigned int JBStreamSet::dropAll(const JabberID& local, const JabberID& remote,
2047     XMPPError::Type error, const char* reason)
2048 {
2049     DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) dropAll(%s,%s,%s,%s) [%p]",
2050 	m_owner->toString().c_str(),local.c_str(),remote.c_str(),
2051 	XMPPUtils::s_error[error].c_str(),reason,this);
2052     unsigned int n = 0;
2053     lock();
2054     for (ObjList* s = m_clients.skipNull(); s; s = s->skipNext()) {
2055 	JBStream* stream = static_cast<JBStream*>(s->get());
2056 	Lock lck(stream);
2057 	bool terminate = false;
2058 	if (!(local || remote))
2059 	    terminate = true;
2060 	else {
2061 	    if (local)
2062 		terminate = stream->local().match(local);
2063 	    if (remote && !terminate) {
2064 		JBServerStream* s2s = stream->incoming() ? stream->serverStream() : 0;
2065 		if (s2s)
2066 		    terminate = s2s->hasRemoteDomain(remote,false);
2067 		else
2068 		    terminate = stream->remote().match(remote);
2069 	    }
2070 	}
2071 	if (terminate) {
2072 	    if (stream->state() != JBStream::Destroy)
2073 		n++;
2074 	    stream->terminate(-1,true,0,error,reason);
2075 	}
2076     }
2077     unlock();
2078     return n;
2079 }
2080 
2081 // Process the list
run()2082 void JBStreamSet::run()
2083 {
2084     DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) start running [%p]",
2085 	m_owner->toString().c_str(),this);
2086     ObjList* o = 0;
2087     while (true) {
2088 	if (Thread::check(false)) {
2089 	    m_exiting = true;
2090 	    break;
2091 	}
2092 	lock();
2093 	if (m_changed) {
2094 	    o = 0;
2095 	    m_changed = false;
2096 	}
2097 	else if (o)
2098 	    o = o->skipNext();
2099 	if (!o)
2100 	    o = m_clients.skipNull();
2101 	bool eof = o && !o->skipNext();
2102 	RefPointer<JBStream> stream = o ? static_cast<JBStream*>(o->get()) : 0;
2103 	unlock();
2104 	if (stream) {
2105 	    process(*stream);
2106 	    stream = 0;
2107 	}
2108 	else {
2109 	    // Lock the owner to prevent adding a new client
2110 	    // Don't exit if a new client was already added
2111 	    Lock lock(m_owner);
2112 	    if (!m_changed) {
2113 		m_exiting = true;
2114 		break;
2115 	    }
2116 	}
2117 	if (eof) {
2118 	    if (m_owner->m_sleepMs)
2119 		Thread::msleep(m_owner->m_sleepMs,false);
2120 	    else
2121 		Thread::idle(false);
2122 	}
2123     }
2124     DDebug(m_owner->engine(),DebugAll,"JBStreamSet(%s) stop running [%p]",
2125 	m_owner->toString().c_str(),this);
2126 }
2127 
2128 // Start running
start()2129 bool JBStreamSet::start()
2130 {
2131     Debug(m_owner->engine(),DebugStub,"JBStreamSet(%s)::start() [%p]",
2132 	m_owner->toString().c_str(),this);
2133     return false;
2134 }
2135 
2136 // Stop running
stop()2137 void JBStreamSet::stop()
2138 {
2139     Debug(m_owner->engine(),DebugStub,"JBStreamSet(%s)::stop() [%p]",
2140 	m_owner->toString().c_str(),this);
2141 }
2142 
2143 
2144 /*
2145  * JBStreamSetProcessor
2146  */
2147 // Calls stream's getEvent(). Pass a generated event to the engine
process(JBStream & stream)2148 bool JBStreamSetProcessor::process(JBStream& stream)
2149 {
2150     JBEvent* ev = stream.getEvent();
2151     if (!ev)
2152 	return false;
2153     bool remove = (ev->type() == JBEvent::Destroy);
2154     m_owner->engine()->processEvent(ev);
2155     if (remove) {
2156 	DDebug(m_owner->engine(),DebugAll,
2157 	    "JBStreamSetProcessor(%s) requesting stream (%p,%s) ref %u removal [%p]",
2158 	    m_owner->toString().c_str(),&stream,stream.toString().c_str(),
2159 	    stream.refcount(),this);
2160 	m_owner->engine()->removeStream(&stream,true);
2161     }
2162     return true;
2163 }
2164 
2165 
2166 /*
2167  * JBStreamSetReceive
2168  */
JBStreamSetReceive(JBStreamSetList * owner)2169 JBStreamSetReceive::JBStreamSetReceive(JBStreamSetList* owner)
2170     : JBStreamSet(owner)
2171 {
2172     if (owner && owner->engine())
2173 	m_buffer.assign(0,owner->engine()->streamReadBuffer());
2174 }
2175 
2176 // Calls stream's readSocket()
process(JBStream & stream)2177 bool JBStreamSetReceive::process(JBStream& stream)
2178 {
2179     return stream.readSocket((char*)m_buffer.data(),m_buffer.length());
2180 }
2181 
2182 
2183 /*
2184  * JBStreamSetList
2185  */
2186 // Constructor
JBStreamSetList(JBEngine * engine,unsigned int max,unsigned int sleepMs,const char * name)2187 JBStreamSetList::JBStreamSetList(JBEngine* engine, unsigned int max,
2188     unsigned int sleepMs, const char* name)
2189     : Mutex(true,"JBStreamSetList"),
2190     m_engine(engine), m_name(name),
2191     m_max(max), m_sleepMs(sleepMs), m_streamCount(0)
2192 {
2193     XDebug(m_engine,DebugAll,"JBStreamSetList::JBStreamSetList(%s) [%p]",
2194 	m_name.c_str(),this);
2195 }
2196 
2197 // Destructor
~JBStreamSetList()2198 JBStreamSetList::~JBStreamSetList()
2199 {
2200     XDebug(m_engine,DebugAll,"JBStreamSetList::~JBStreamSetList(%s) [%p]",
2201 	m_name.c_str(),this);
2202 }
2203 
2204 // Add a stream to the list. Build a new set if there is no room in existing sets
add(JBStream * client)2205 bool JBStreamSetList::add(JBStream* client)
2206 {
2207     if (!client || m_engine->exiting())
2208 	return false;
2209     Lock lock(this);
2210     for (ObjList* o = m_sets.skipNull(); o; o = o->skipNext()) {
2211 	if ((static_cast<JBStreamSet*>(o->get()))->add(client)) {
2212 	    m_streamCount++;
2213 	    return true;
2214 	}
2215     }
2216     // Build a new set
2217     JBStreamSet* set = build();
2218     if (!set)
2219 	return false;
2220     if (!set->add(client)) {
2221 	lock.drop();
2222 	TelEngine::destruct(set);
2223 	return false;
2224     }
2225     m_streamCount++;
2226     m_sets.append(set);
2227     Debug(m_engine,DebugAll,"JBStreamSetList(%s) added set (%p) count=%u [%p]",
2228 	m_name.c_str(),set,m_sets.count(),this);
2229     lock.drop();
2230     if (!set->start())
2231 	TelEngine::destruct(set);
2232     return true;
2233 }
2234 
2235 // Remove a stream from list
remove(JBStream * client,bool delObj)2236 void JBStreamSetList::remove(JBStream* client, bool delObj)
2237 {
2238     if (!client)
2239 	return;
2240     DDebug(m_engine,DebugAll,"JBStreamSetList(%s) removing (%p,'%s') delObj=%u [%p]",
2241 	m_name.c_str(),client,client->name(),delObj,this);
2242     Lock lock(this);
2243     for (ObjList* o = m_sets.skipNull(); o; o = o->skipNext()) {
2244 	if ((static_cast<JBStreamSet*>(o->get()))->remove(client,delObj)) {
2245 	    if (m_streamCount)
2246 		m_streamCount--;
2247 	    return;
2248 	}
2249     }
2250 }
2251 
2252 // Stop one set or all sets
stop(JBStreamSet * set,bool waitTerminate)2253 void JBStreamSetList::stop(JBStreamSet* set, bool waitTerminate)
2254 {
2255     // A set will stop when all its streams will terminate
2256     // Stop it now if wait is not requested
2257     Lock lck(this);
2258     if (set) {
2259 	if (set->m_owner != this)
2260 	    return;
2261 	DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopping set (%p) [%p]",
2262 	    m_name.c_str(),set,this);
2263 	set->dropAll();
2264 	if (!waitTerminate)
2265 	    set->stop();
2266 	lck.drop();
2267 	while (true) {
2268 	    lock();
2269 	    bool ok = (0 == m_sets.find(set));
2270 	    unlock();
2271 	    if (ok)
2272 		break;
2273 	    Thread::yield(!waitTerminate);
2274 	}
2275 	DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopped set (%p) [%p]",
2276 	    m_name.c_str(),set,this);
2277 	return;
2278     }
2279     ObjList* o = m_sets.skipNull();
2280     if (!o)
2281 	return;
2282     DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopping %u sets [%p]",
2283 	m_name.c_str(),m_sets.count(),this);
2284     for (; o; o =  o->skipNext()) {
2285 	set = static_cast<JBStreamSet*>(o->get());
2286 	set->dropAll();
2287 	if (!waitTerminate)
2288 	    set->stop();
2289     }
2290     lck.drop();
2291     while (true) {
2292 	lock();
2293 	bool ok = (0 == m_sets.skipNull());
2294 	unlock();
2295 	if (ok)
2296 	    break;
2297 	Thread::yield(!waitTerminate);
2298     }
2299     DDebug(m_engine,DebugAll,"JBStreamSetList(%s) stopped all sets [%p]",
2300 	m_name.c_str(),this);
2301 }
2302 
2303 // Get the string representation of this list
toString() const2304 const String& JBStreamSetList::toString() const
2305 {
2306     return m_name;
2307 }
2308 
2309 // Stop all sets. Release memory
destroyed()2310 void JBStreamSetList::destroyed()
2311 {
2312     stop(0,true);
2313     RefObject::destroyed();
2314 }
2315 
2316 //Remove a set from list without deleting it
remove(JBStreamSet * set)2317 void JBStreamSetList::remove(JBStreamSet* set)
2318 {
2319     if (!set)
2320 	return;
2321     Lock lock(this);
2322     ObjList* o = m_sets.find(set);
2323     if (!o)
2324 	return;
2325     o->remove(false);
2326     Debug(m_engine,DebugAll,"JBStreamSetList(%s) removed set (%p) count=%u [%p]",
2327 	m_name.c_str(),set,m_sets.count(),this);
2328 }
2329 
2330 // Build a specialized stream set. Descendants must override this method
build()2331 JBStreamSet* JBStreamSetList::build()
2332 {
2333     Debug(m_engine,DebugStub,"JBStreamSetList(%s) build() not implemented! [%p]",
2334 	m_name.c_str(),this);
2335     return 0;
2336 }
2337 
2338 
2339 /*
2340  * JBEntityCapsList
2341  */
2342 
2343 class EntityCapsRequest : public String
2344 {
2345 public:
EntityCapsRequest(const String & id,JBEntityCaps * caps)2346     inline EntityCapsRequest(const String& id, JBEntityCaps* caps)
2347 	: String(id), m_caps(caps), m_expire(Time::msecNow() + 30000)
2348 	{}
~EntityCapsRequest()2349     inline ~EntityCapsRequest()
2350 	{ TelEngine::destruct(m_caps); }
2351     JBEntityCaps* m_caps;
2352     u_int64_t m_expire;
2353 private:
EntityCapsRequest()2354     EntityCapsRequest() {}
2355 };
2356 
2357 // Expire pending requests
expire(u_int64_t msecNow)2358 void JBEntityCapsList::expire(u_int64_t msecNow)
2359 {
2360     if (!m_enable)
2361 	return;
2362     Lock lock(this);
2363     // Stop at the first not expired item: the other items are added after it
2364     for (ObjList* o = m_requests.skipNull(); o; o = o->skipNull()) {
2365 	EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o->get());
2366 	if (r->m_caps && msecNow < r->m_expire)
2367 	    break;
2368 	DDebug(DebugInfo,"JBEntityCapsList request id=%s timed out [%p]",
2369 	    r->toString().c_str(),this);
2370 	o->remove();
2371     }
2372 }
2373 
2374 // Process a response. This method is thread safe
processRsp(XmlElement * rsp,const String & id,bool ok)2375 bool JBEntityCapsList::processRsp(XmlElement* rsp, const String& id, bool ok)
2376 {
2377     if (!(rsp && id && id.startsWith(m_reqPrefix)))
2378 	return false;
2379     if (!m_enable)
2380 	return true;
2381     Lock lock(this);
2382     GenObject* o = m_requests.remove(id,false);
2383     if (!o) {
2384 	DDebug(DebugInfo,"JBEntityCapsList::processRsp(%p,%s,%u) id not found [%p]",
2385 	    &rsp,id.c_str(),ok,this);
2386 	return true;
2387     }
2388     while (ok) {
2389 	XmlElement* query = XMPPUtils::findFirstChild(*rsp,XmlTag::Query);
2390 	if (!(query && XMPPUtils::hasXmlns(*query,XMPPNamespace::DiscoInfo)))
2391 	    break;
2392 	EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o);
2393 	JBEntityCaps* caps = r->m_caps;
2394 	if (!caps)
2395 	    break;
2396 	// Check node (only for XEP 0115 ver >= 1.4)
2397 	if (caps->m_version == JBEntityCaps::Ver1_4) {
2398 	    String* node = query->getAttribute("node");
2399 	    if (node && *node != (caps->m_node + "#" + caps->m_data)) {
2400 		DDebug(DebugAll,"JBEntityCapsList response with invalid node=%s [%p]",
2401 		    node->c_str(),this);
2402 		break;
2403 	    }
2404 	}
2405 	caps->m_features.fromDiscoInfo(*query);
2406 	// Check hash
2407 	if (caps->m_version == JBEntityCaps::Ver1_4) {
2408 	    caps->m_features.updateEntityCaps();
2409 	    if (caps->m_data != caps->m_features.m_entityCapsHash) {
2410 		DDebug(DebugAll,"JBEntityCapsList response with invalid hash=%s (expected=%s) [%p]",
2411 		    caps->m_features.m_entityCapsHash.c_str(),caps->m_data.c_str(),this);
2412 		break;
2413 	    }
2414 	}
2415 	r->m_caps = 0;
2416 	// OK
2417 	append(caps);
2418 	capsAdded(caps);
2419 	break;
2420     }
2421     TelEngine::destruct(o);
2422     return true;
2423 }
2424 
2425 // Request entity capabilities.
requestCaps(JBStream * stream,const char * from,const char * to,const String & id,char version,const char * node,const char * data)2426 void JBEntityCapsList::requestCaps(JBStream* stream, const char* from, const char* to,
2427     const String& id, char version, const char* node, const char* data)
2428 {
2429     if (!stream)
2430 	return;
2431     Lock lock(this);
2432     // Make sure we don't send another disco info for the same id
2433     for (ObjList* o = m_requests.skipNull(); o; o = o->skipNext()) {
2434 	EntityCapsRequest* r = static_cast<EntityCapsRequest*>(o->get());
2435 	if (r->m_caps && id == r->m_caps)
2436 	    return;
2437     }
2438     String reqId;
2439     reqId << m_reqPrefix << ++m_reqIndex;
2440     m_requests.append(new EntityCapsRequest(reqId,new JBEntityCaps(id,version,node,data)));
2441     lock.drop();
2442     XmlElement* d = 0;
2443     if (version == JBEntityCaps::Ver1_4)
2444 	d = XMPPUtils::createIqDisco(true,true,from,to,reqId,node,data);
2445     else
2446 	d = XMPPUtils::createIqDisco(true,true,from,to,reqId);
2447     DDebug(DebugAll,"JBEntityCapsList sending request to=%s node=%s id=%s [%p]",
2448 	to,node,reqId.c_str(),this);
2449     stream->sendStanza(d);
2450 }
2451 
2452 // Build a document from this list
toDocument(const char * rootName)2453 XmlDocument* JBEntityCapsList::toDocument(const char* rootName)
2454 {
2455     Lock lock(this);
2456     XmlDocument* doc = new XmlDocument;
2457     XmlDeclaration* decl = new XmlDeclaration;
2458     XmlSaxParser::Error err = doc->addChild(decl);
2459     if (err != XmlSaxParser::NoError)
2460 	TelEngine::destruct(decl);
2461     XmlComment* info = new XmlComment("Generated jabber entity capabilities cache");
2462     err = doc->addChild(info);
2463     if (err != XmlSaxParser::NoError)
2464 	TelEngine::destruct(info);
2465     XmlElement* root = new XmlElement(rootName);
2466     err = doc->addChild(root);
2467     if (err != XmlSaxParser::NoError) {
2468 	TelEngine::destruct(root);
2469 	return doc;
2470     }
2471     for (ObjList* o = skipNull(); o; o = o->skipNext()) {
2472 	JBEntityCaps* caps = static_cast<JBEntityCaps*>(o->get());
2473 	XmlElement* item = new XmlElement(s_entityCapsItem);
2474 	item->setAttribute("id",caps->c_str());
2475 	item->setAttribute("version",String((int)caps->m_version));
2476 	item->setAttribute("node",caps->m_node);
2477 	item->setAttribute("data",caps->m_data);
2478 	caps->m_features.add(*item);
2479 	doc->addChild(item);
2480     }
2481     return doc;
2482 }
2483 
2484 // Build this list from an XML document
fromDocument(XmlDocument & doc,const char * rootName)2485 void JBEntityCapsList::fromDocument(XmlDocument& doc, const char* rootName)
2486 {
2487     Lock lock(this);
2488     clear();
2489     m_requests.clear();
2490     XmlElement* root = doc.root();
2491     if (!root || (!TelEngine::null(rootName) && root->toString() != rootName)) {
2492 	DDebug(DebugAll,"JBEntityCapsList invalid document root %p '%s' (expected=%s) [%p]",
2493 	    root,root ? root->tag() : "",rootName,this);
2494 	return;
2495     }
2496     XmlElement* item = root->findFirstChild(&s_entityCapsItem);
2497     for (; item; item = root->findNextChild(item,&s_entityCapsItem)) {
2498 	String* id = item->getAttribute("id");
2499 	if (TelEngine::null(id))
2500 	    continue;
2501 	String* tmp = item->getAttribute("version");
2502 	JBEntityCaps* cap = new JBEntityCaps(*id,tmp ? tmp->toInteger(-1) : -1,
2503 	    item->attribute("node"),item->attribute("data"));
2504 	cap->m_features.fromDiscoInfo(*item);
2505 	append(cap);
2506     }
2507     capsAdded(0);
2508 }
2509 
2510 // Process an element containing an entity capabily child.
2511 // Request capabilities if not found in the list
processCaps(String & capsId,XmlElement * xml,JBStream * stream,const char * from,const char * to)2512 bool JBEntityCapsList::processCaps(String& capsId, XmlElement* xml, JBStream* stream,
2513     const char* from, const char* to)
2514 {
2515     if (!(m_enable && xml))
2516 	return false;
2517     char version = 0;
2518     String* node = 0;
2519     String* ver = 0;
2520     String* ext = 0;
2521     if (!decodeCaps(*xml,version,node,ver,ext))
2522 	return false;
2523     JBEntityCaps::buildId(capsId,version,*node,*ver,ext);
2524     Lock lock(this);
2525     JBEntityCaps* caps = findCaps(capsId);
2526     if (caps)
2527 	return true;
2528     // Hack for google (doesn't support disco info, supports only disco info with node)
2529     if (version == JBEntityCaps::Ver1_3 &&
2530 	(*node == s_googleTalkNode || *node == s_googleMailNode ||
2531 	*node == s_googleAndroidNode || *node == s_googleAndroidNode2)) {
2532 	caps = new JBEntityCaps(capsId,version,*node,*ver);
2533 	if (ext) {
2534 	    ObjList* list = ext->split(' ',false);
2535 	    if (list->find("voice-v1")) {
2536 		caps->m_features.add(XMPPNamespace::JingleSession);
2537 		caps->m_features.add(XMPPNamespace::JingleAudio);
2538 	    }
2539 	    TelEngine::destruct(list);
2540 	}
2541 	append(caps);
2542 	capsAdded(caps);
2543 	return true;
2544     }
2545     if (stream)
2546 	requestCaps(stream,from,to,capsId,version,*node,*ver);
2547     return stream != 0;
2548 }
2549 
2550 // Add capabilities to a list.
addCaps(NamedList & list,JBEntityCaps & caps)2551 void JBEntityCapsList::addCaps(NamedList& list, JBEntityCaps& caps)
2552 {
2553 #define CHECK_NS(ns,param) \
2554     if (caps.hasFeature(ns)) { \
2555 	params->append(param,","); \
2556 	list.addParam(param,String::boolText(true)); \
2557     }
2558     int jingleVersion = -1;
2559     if (caps.m_features.get(XMPPNamespace::Jingle))
2560 	jingleVersion = 1;
2561     else if (caps.m_features.get(XMPPNamespace::JingleSession) ||
2562 	caps.m_features.get(XMPPNamespace::JingleVoiceV1))
2563 	jingleVersion = 0;
2564     NamedString* params = new NamedString("caps.params");
2565     list.addParam("caps.id",caps.toString());
2566     list.addParam(params);
2567     if (jingleVersion != -1) {
2568 	params->append("caps.jingle_version");
2569 	list.addParam("caps.jingle_version",String(jingleVersion));
2570 	if (caps.hasAudio()) {
2571 	    params->append("caps.audio",",");
2572 	    list.addParam("caps.audio",String::boolText(true));
2573 	}
2574 	switch (jingleVersion) {
2575 	    case 1:
2576 		CHECK_NS(XMPPNamespace::JingleTransfer,"caps.calltransfer");
2577 		CHECK_NS(XMPPNamespace::JingleAppsFileTransfer,"caps.filetransfer");
2578 		break;
2579 	    case 0:
2580 		break;
2581 	}
2582 	CHECK_NS(XMPPNamespace::FileInfoShare,"caps.fileinfoshare");
2583 	CHECK_NS(XMPPNamespace::ResultSetMngt,"caps.resultsetmngt");
2584     }
2585     CHECK_NS(XMPPNamespace::Muc,"caps.muc");
2586 #undef CHECK_NS
2587 }
2588 
2589 // Load (reset) this list from an XML document file.
loadXmlDoc(const char * file,DebugEnabler * enabler)2590 bool JBEntityCapsList::loadXmlDoc(const char* file, DebugEnabler* enabler)
2591 {
2592     if (!m_enable)
2593 	return false;
2594     XmlDocument d;
2595     int io = 0;
2596     DDebug(enabler,DebugAll,"Loading entity caps from '%s'",file);
2597     XmlSaxParser::Error err = d.loadFile(file,&io);
2598     if (err == XmlSaxParser::NoError) {
2599 	fromDocument(d);
2600 	return true;
2601     }
2602     String error;
2603     if (err == XmlSaxParser::IOError) {
2604 	String tmp;
2605 	Thread::errorString(tmp,io);
2606 	error << " " << io << " '" << tmp << "'";
2607     }
2608     Debug(enabler,DebugNote,"Failed to load entity caps from '%s': %s%s",
2609 	file,XmlSaxParser::getError(err),error.safe());
2610     return false;
2611 }
2612 
2613 // Save this list to an XML document file.
saveXmlDoc(const char * file,DebugEnabler * enabler)2614 bool JBEntityCapsList::saveXmlDoc(const char* file, DebugEnabler* enabler)
2615 {
2616     DDebug(enabler,DebugAll,"Saving entity caps to '%s'",file);
2617     if (TelEngine::null(file))
2618 	return false;
2619     XmlDocument* doc = toDocument();
2620     int res = doc->saveFile(file,true,"  ");
2621     if (res)
2622 	Debug(enabler,DebugNote,"Failed to save entity caps to '%s'",file);
2623     delete doc;
2624     return res == 0;
2625 }
2626 
2627 // Check if an XML element has a 'c' entity capability child and process it
decodeCaps(const XmlElement & xml,char & version,String * & node,String * & ver,String * & ext)2628 bool JBEntityCapsList::decodeCaps(const XmlElement& xml, char& version, String*& node,
2629     String*& ver, String*& ext)
2630 {
2631     // Find the first entity caps child with valid node and ext
2632     XmlElement* c = 0;
2633     while (true) {
2634 	c = XMPPUtils::findNextChild(xml,c,XmlTag::EntityCapsTag,
2635 	    XMPPNamespace::EntityCaps);
2636 	if (!c)
2637 	    break;
2638 	if (TelEngine::null(c->getAttribute("node")) ||
2639 	    TelEngine::null(c->getAttribute("ver")))
2640 	    continue;
2641 	break;
2642     }
2643     if (!c)
2644 	return false;
2645     // Check for a subsequent child with new entity caps if the first one is an old version
2646     if (!c->getAttribute("hash")) {
2647 	XmlElement* s = c;
2648 	while (true) {
2649 	    s = XMPPUtils::findNextChild(xml,s,XmlTag::EntityCapsTag,
2650 		XMPPNamespace::EntityCaps);
2651 	    if (!s)
2652 		break;
2653 	    if (!s->getAttribute("hash") ||
2654 		TelEngine::null(s->getAttribute("node")) ||
2655 		TelEngine::null(s->getAttribute("ver")))
2656 		continue;
2657 	    c = s;
2658 	    break;
2659 	}
2660     }
2661     node = c->getAttribute("node");
2662     ver = c->getAttribute("ver");
2663     String* hash = c->getAttribute("hash");
2664     if (hash) {
2665 	// Version 1.4 or greater
2666 	if (*hash != "sha-1")
2667 	    return false;
2668 	version = JBEntityCaps::Ver1_4;
2669 	ext = 0;
2670     }
2671     else {
2672 	version = JBEntityCaps::Ver1_3;
2673 	ext = c->getAttribute("ext");
2674     }
2675     return true;
2676 }
2677 
2678 /* vi: set ts=8 sw=4 sts=4 noet: */
2679