1 /**
2  * jbstream.cpp
3  * Yet Another Jabber Component Protocol Stack
4  * This file is part of the YATE Project http://YATE.null.ro
5  *
6  * Yet Another Telephony Engine - a fully featured software PBX and IVR
7  * Copyright (C) 2004-2014 Null Team
8  *
9  * This software is distributed under multiple licenses;
10  * see the COPYING file in the main directory for licensing
11  * information for this specific distribution.
12  *
13  * This use of this software may be subject to additional restrictions.
14  * See the LEGAL file in the main directory for details.
15  *
16  * This program is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19  */
20 
21 #include <yatejabber.h>
22 #include <stdlib.h>
23 
24 using namespace TelEngine;
25 
26 #ifdef XDEBUG
27   #define JBSTREAM_DEBUG_COMPRESS
28   #define JBSTREAM_DEBUG_SOCKET
29 #else
30 //  #define JBSTREAM_DEBUG_COMPRESS                 // Show (de)compress debug
31 //  #define JBSTREAM_DEBUG_SOCKET                   // Show socket read/write debug
32 #endif
33 
34 static const String s_dbVerify = "verify";
35 static const String s_dbResult = "result";
36 
isDbVerify(XmlElement & xml)37 static inline bool isDbVerify(XmlElement& xml)
38 {
39     const String* tag = 0;
40     const String* ns = 0;
41     return xml.getTag(tag,ns) && *tag == s_dbVerify &&
42 	ns && *ns == XMPPUtils::s_ns[XMPPNamespace::Dialback];
43 }
44 
isDbResult(XmlElement & xml)45 static inline bool isDbResult(XmlElement& xml)
46 {
47     const String* tag = 0;
48     const String* ns = 0;
49     return xml.getTag(tag,ns) && *tag == s_dbResult &&
50 	ns && *ns == XMPPUtils::s_ns[XMPPNamespace::Dialback];
51 }
52 
53 // Decode a Base64 string to a block
decodeBase64(DataBlock & buf,const String & str)54 static inline bool decodeBase64(DataBlock& buf, const String& str)
55 {
56     Base64 b((void*)str.c_str(),str.length(),false);
57     bool ok = b.decode(buf,false);
58     b.clear(false);
59     return ok;
60 }
61 
62 // Decode a Base64 string to another string
63 // Check if decoded data has valid UTF8 characters
decodeBase64(String & buf,const String & str,JBStream * stream)64 static bool decodeBase64(String& buf, const String& str, JBStream* stream)
65 {
66     DataBlock d;
67     if (!decodeBase64(d,str))
68 	return false;
69     buf.assign((const char*)d.data(),d.length());
70     if (-1 != buf.lenUtf8())
71 	return true;
72     Debug(stream,DebugNote,"Received Base64 encoded invalid UTF8 characters [%p]",stream);
73     return false;
74 }
75 
76 #ifdef DEBUG
checkPing(JBStream * stream,const XmlElement * xml,const String & pingId)77 static bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
78 {
79     if (!(stream && xml && pingId))
80 	return false;
81     if (pingId != xml->getAttribute(YSTRING("id")))
82 	return false;
83     const char* it = xml->attribute(YSTRING("type"));
84     XMPPUtils::IqType iqType = XMPPUtils::iqType(it);
85     bool ok = (iqType == XMPPUtils::IqResult || iqType == XMPPUtils::IqError);
86     if (ok)
87 	Debug(stream,DebugAll,"Ping with id=%s confirmed by '%s' [%p]",pingId.c_str(),it,stream);
88     return ok;
89 }
90 #else
checkPing(JBStream * stream,const XmlElement * xml,const String & pingId)91 static inline bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
92 {
93     return false;
94 }
95 #endif
96 
97 static const TokenDict s_location[] = {
98     {"internal",     0},
99     {"remote",       1},
100     {"local",       -1},
101     {0,0},
102 };
103 
104 const TokenDict JBStream::s_stateName[] = {
105     {"Running",          Running},
106     {"Idle",             Idle},
107     {"Connecting",       Connecting},
108     {"WaitStart",        WaitStart},
109     {"Starting",         Starting},
110     {"Features",         Features},
111     {"WaitTlsRsp",       WaitTlsRsp},
112     {"Auth",             Auth},
113     {"Challenge",        Challenge},
114     {"Securing",         Securing},
115     {"Compressing",      Compressing},
116     {"Register",         Register},
117     {"Destroy",          Destroy},
118     {0,0},
119 };
120 
121 const TokenDict JBStream::s_flagName[] = {
122     {"noautorestart",    NoAutoRestart},
123     {"tlsrequired",      TlsRequired},
124     {"dialback",         DialbackOnly},
125     {"allowplainauth",   AllowPlainAuth},
126     {"register",         RegisterUser},
127     {"compress",         Compress},
128     {"error",            InError},
129     // Internal flags
130     {"roster_requested", RosterRequested},
131     {"online",           AvailableResource},
132     {"secured",          StreamTls | StreamSecured},
133     {"encrypted",        StreamTls},
134     {"authenticated",    StreamAuthenticated},
135     {"waitbindrsp",      StreamWaitBindRsp},
136     {"waitsessrsp",      StreamWaitSessRsp},
137     {"waitchallenge",    StreamWaitChallenge},
138     {"waitchallengersp", StreamWaitChgRsp},
139     {"version1",         StreamRemoteVer1},
140     {"compressed",       StreamCompressed},
141     {"cancompress",      StreamCanCompress},
142     {0,0}
143 };
144 
145 const TokenDict JBStream::s_typeName[] = {
146     {"c2s",      c2s},
147     {"s2s",      s2s},
148     {"comp",     comp},
149     {"cluster",  cluster},
150     {0,0}
151 };
152 
153 // Retrieve the multiplier for non client stream timers
timerMultiplier(JBStream * stream)154 static inline unsigned int timerMultiplier(JBStream* stream)
155 {
156     return stream->type() == JBStream::c2s ? 1 : 2;
157 }
158 
159 
160 /*
161  * JBStream
162  */
163 // Incoming
JBStream(JBEngine * engine,Socket * socket,Type t,bool ssl)164 JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
165     : Mutex(true,"JBStream"),
166     m_sasl(0),
167     m_state(Idle), m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0),
168     m_setupTimeout(0), m_startTimeout(0),
169     m_pingTimeout(0), m_pingInterval(0), m_nextPing(0),
170     m_idleTimeout(0), m_connectTimeout(0),
171     m_restart(0), m_timeToFillRestart(0),
172     m_engine(engine), m_type(t),
173     m_incoming(true), m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
174     m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
175     m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start),
176     m_redirectMax(0), m_redirectCount(0), m_redirectPort(0)
177 {
178     if (ssl)
179 	setFlags(StreamSecured | StreamTls);
180     m_engine->buildStreamName(m_name,this);
181     debugName(m_name);
182     debugChain(m_engine);
183     Debug(this,DebugAll,"JBStream::JBStream(%p,%p,%s,%s) incoming [%p]",
184 	engine,socket,typeName(),String::boolText(ssl),this);
185     setXmlns();
186     // Don't restart incoming streams
187     setFlags(NoAutoRestart);
188     resetConnection(socket);
189     changeState(WaitStart);
190 }
191 
192 // Outgoing
JBStream(JBEngine * engine,Type t,const JabberID & local,const JabberID & remote,const char * name,const NamedList * params,const char * serverHost)193 JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const JabberID& remote,
194     const char* name, const NamedList* params, const char* serverHost)
195     : Mutex(true,"JBStream"),
196     m_sasl(0),
197     m_state(Idle), m_local(local), m_remote(remote), m_serverHost(serverHost),
198     m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0), m_stanzaIndex(0),
199     m_setupTimeout(0), m_startTimeout(0),
200     m_pingTimeout(0), m_nextPing(0),
201     m_idleTimeout(0), m_connectTimeout(0),
202     m_restart(1), m_timeToFillRestart(0),
203     m_engine(engine), m_type(t),
204     m_incoming(false), m_name(name),
205     m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
206     m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
207     m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start),
208     m_redirectMax(engine->redirectMax()), m_redirectCount(0), m_redirectPort(0)
209 {
210     if (!m_name)
211 	m_engine->buildStreamName(m_name,this);
212     debugName(m_name);
213     debugChain(m_engine);
214     if (params) {
215 	int flgs = XMPPUtils::decodeFlags(params->getValue("options"),s_flagName);
216 	setFlags(flgs & StreamFlags);
217 	m_connectAddr = params->getValue("server",params->getValue("address"));
218 	m_connectPort = params->getIntValue("port");
219 	m_localIp = params->getValue("localip");
220     }
221     else
222 	updateFromRemoteDef();
223     // Compress always defaults to true if not explicitly disabled
224     if (!flag(Compress) && !(params && params->getBoolValue("nocompression")))
225 	setFlags(Compress);
226     Debug(this,DebugAll,"JBStream::JBStream(%p,%s,%s,%s,%s) outgoing [%p]",
227 	engine,typeName(),local.c_str(),remote.c_str(),m_serverHost.safe(),this);
228     setXmlns();
229     changeState(Idle);
230 }
231 
232 // Destructor
~JBStream()233 JBStream::~JBStream()
234 {
235     DDebug(this,DebugAll,"JBStream::~JBStream() id=%s [%p]",m_name.c_str(),this);
236     TelEngine::destruct(m_sasl);
237 }
238 
239 // Outgoing stream connect terminated notification.
connectTerminated(Socket * & sock)240 void JBStream::connectTerminated(Socket*& sock)
241 {
242     Lock lock(this);
243     if (m_state == Connecting) {
244 	if (sock) {
245 	    resetConnection(sock);
246 	    sock = 0;
247 	    changeState(Starting);
248 	    XmlElement* s = buildStreamStart();
249 	    sendStreamXml(WaitStart,s);
250 	}
251 	else {
252 	    DDebug(this,DebugNote,"Connect failed [%p]",this);
253 	    resetConnectStatus();
254 	    setRedirect();
255 	    m_redirectCount = 0;
256 	    terminate(0,false,0,XMPPError::NoRemote);
257 	}
258 	return;
259     }
260     DDebug(this,DebugInfo,"Connect terminated notification in non %s state [%p]",
261 	lookup(Connecting,s_stateName),this);
262     if (sock) {
263 	delete sock;
264 	sock = 0;
265     }
266 }
267 
268 // Connecting notification. Start connect timer for synchronous connect
connecting(bool sync,int stat,ObjList & srvs)269 bool JBStream::connecting(bool sync, int stat, ObjList& srvs)
270 {
271     if (incoming() || !m_engine || state() != Connecting)
272 	return false;
273     Lock lock(this);
274     if (state() != Connecting)
275 	return false;
276     m_connectStatus = stat;
277     SrvRecord::copy(m_connectSrvs,srvs);
278     if (sync) {
279 	if (stat != JBConnect::Srv)
280 	    m_connectTimeout = Time::msecNow() + m_engine->m_connectTimeout;
281 	else
282 	    m_connectTimeout = Time::msecNow() + m_engine->m_srvTimeout;
283     }
284     else
285 	m_connectTimeout = 0;
286     DDebug(this,DebugAll,"Connecting sync=%u stat=%s [%p]",
287 	sync,lookup(m_connectStatus,JBConnect::s_statusName),this);
288     return true;
289 }
290 
291 // Get an object from this stream
getObject(const String & name) const292 void* JBStream::getObject(const String& name) const
293 {
294     if (name == "Socket*")
295 	return state() == Securing ? (void*)&m_socket : 0;
296     if (name == "Compressor*")
297 	return (void*)&m_compress;
298     if (name == "JBStream")
299 	return (void*)this;
300     return RefObject::getObject(name);
301 }
302 
303 // Get the string representation of this stream
toString() const304 const String& JBStream::toString() const
305 {
306     return m_name;
307 }
308 
309 // Check if the stream has valid pending data
haveData()310 bool JBStream::haveData()
311 {
312     Lock2 lck(this,&m_socketMutex);
313     // Pending data with socket available for writing
314     if (m_pending.skipNull() && socketCanWrite())
315 	return true;
316     // Pending events
317     if (m_events.skipNull())
318 	return true;
319     // Pending incoming XML
320     XmlDocument* doc = m_xmlDom ? m_xmlDom->document() : 0;
321     XmlElement* root = doc ? doc->root(false) : 0;
322     XmlElement* first = root ? root->findFirstChild() : 0;
323     return first && first->completed();
324 }
325 
326 // Retrieve connection address(es), port and status
connectAddr(String & addr,int & port,String & localip,int & stat,ObjList & srvs,bool * isRedirect) const327 void JBStream::connectAddr(String& addr, int& port, String& localip, int& stat,
328     ObjList& srvs, bool* isRedirect) const
329 {
330     if (m_redirectAddr) {
331 	addr = m_redirectAddr;
332 	port = m_redirectPort;
333     }
334     else {
335 	addr = m_connectAddr;
336 	port = m_connectPort;
337     }
338     if (isRedirect)
339 	*isRedirect = !m_redirectAddr.null();
340     localip = m_localIp;
341     stat = m_connectStatus;
342     SrvRecord::copy(srvs,m_connectSrvs);
343 }
344 
345 // Set/reset RosterRequested flag
setRosterRequested(bool ok)346 void JBStream::setRosterRequested(bool ok)
347 {
348     Lock lock(this);
349     if (ok == flag(RosterRequested))
350 	return;
351     if (ok)
352 	setFlags(RosterRequested);
353     else
354 	resetFlags(RosterRequested);
355     XDebug(this,DebugAll,"%s roster requested flag [%p]",ok ? "Set" : "Reset",this);
356 }
357 
358 // Set/reset AvailableResource/PositivePriority flags
setAvailableResource(bool ok,bool positive)359 bool JBStream::setAvailableResource(bool ok, bool positive)
360 {
361     Lock lock(this);
362     if (ok && positive)
363 	setFlags(PositivePriority);
364     else
365 	resetFlags(PositivePriority);
366     if (ok == flag(AvailableResource))
367 	return false;
368     if (ok)
369 	setFlags(AvailableResource);
370     else
371 	resetFlags(AvailableResource);
372     XDebug(this,DebugAll,"%s available resource flag [%p]",ok ? "Set" : "Reset",this);
373     return true;
374 }
375 
376 // Read data from socket. Send it to the parser
readSocket(char * buf,unsigned int len)377 bool JBStream::readSocket(char* buf, unsigned int len)
378 {
379     if (!(buf && len > 1))
380 	return false;
381     if (!socketCanRead())
382 	return false;
383     Lock2 lock(*this,m_socketMutex);
384     if (!socketCanRead() || state() == Destroy || state() == Idle || state() == Connecting)
385 	return false;
386     socketSetReading(true);
387     if (state() != WaitTlsRsp)
388 	len--;
389     else
390 	len = 1;
391     lock.drop();
392     // Check stream state
393     XMPPError::Type error = XMPPError::NoError;
394     int read = m_socket->readData(buf,len);
395     Lock lck(m_socketMutex);
396     // Check if the connection is waiting to be reset
397     if (socketWaitReset()) {
398 	socketSetReading(false);
399 	return false;
400     }
401     // Check if something changed
402     if (!(m_socket && socketReading())) {
403 	Debug(this,DebugAll,"Socket deleted while reading [%p]",this);
404 	return false;
405     }
406     if (read && read != Socket::socketError()) {
407 	if (!flag(StreamCompressed)) {
408 	    buf[read] = 0;
409 #ifdef JBSTREAM_DEBUG_SOCKET
410 	    Debug(this,DebugInfo,"Received %s [%p]",buf,this);
411 #endif
412 	    if (!m_xmlDom->parse(buf)) {
413 		if (m_xmlDom->error() != XmlSaxParser::Incomplete)
414 		    error = XMPPError::Xml;
415 		else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
416 		    error = XMPPError::Policy;
417 	    }
418 	}
419 	else if (m_compress) {
420 #ifdef JBSTREAM_DEBUG_SOCKET
421 	    Debug(this,DebugInfo,"Received %d compressed bytes [%p]",read,this);
422 #endif
423 	    DataBlock d;
424 	    int res = m_compress->decompress(buf,read,d);
425 	    if (res == read) {
426 #ifdef JBSTREAM_DEBUG_COMPRESS
427 		Debug(this,DebugInfo,"Decompressed %d --> %u [%p]",read,d.length(),this);
428 #endif
429 		if (d.length()) {
430 		    char c = 0;
431 		    d.append(&c,1);
432 		    buf = (char*)d.data();
433 #ifdef JBSTREAM_DEBUG_SOCKET
434 		    Debug(this,DebugInfo,"Received compressed %s [%p]",buf,this);
435 #endif
436 		    if (!m_xmlDom->parse(buf)) {
437 			if (m_xmlDom->error() != XmlSaxParser::Incomplete)
438 			    error = XMPPError::Xml;
439 			else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
440 			    error = XMPPError::Policy;
441 		    }
442 		}
443 	    }
444 	    else
445 		error = XMPPError::UndefinedCondition;
446 	}
447 	else
448 	    error = XMPPError::Internal;
449     }
450     socketSetReading(false);
451     if (read) {
452 	if (read == Socket::socketError()) {
453 	    if (m_socket->canRetry()) {
454 		read = 0;
455 #ifdef XDEBUG
456 		String tmp;
457 		Thread::errorString(tmp,m_socket->error());
458 		Debug(this,DebugAll,"Socket temporary unavailable for read. %d: '%s' [%p]",
459 		    m_socket->error(),tmp.c_str(),this);
460 #endif
461 	    }
462 	    else
463 		error = XMPPError::SocketError;
464 	}
465     }
466     else
467 	error = XMPPError::SocketError;
468     if (error == XMPPError::NoError) {
469 	// Stop reading if waiting for TLS start and received a complete element
470 	// We'll wait for the stream processor to handle the received element
471 	if (read && state() == WaitTlsRsp && !m_xmlDom->buffer().length() &&
472 	    m_xmlDom->unparsed() == XmlSaxParser::None) {
473 	    XmlDocument* doc = m_xmlDom->document();
474 	    // If received a complete element, the parser's current element is
475 	    // the document's root
476 	    if (doc && m_xmlDom->isCurrent(doc->root())) {
477 		DDebug(this,DebugAll,"Received complete element in state=%s. Stop reading [%p]",
478 		    stateName(),this);
479 		socketSetCanRead(false);
480 	    }
481 	}
482 	return read > 0;
483     }
484     // Error
485     int location = 0;
486     String reason;
487     if (error != XMPPError::SocketError) {
488 	if (error == XMPPError::Xml) {
489 	    reason << "Parser error '" << m_xmlDom->getError() << "'";
490 	    Debug(this,DebugNote,"%s buffer='%s' [%p]",
491 		reason.c_str(),m_xmlDom->buffer().c_str(),this);
492 	}
493 	else if (error == XMPPError::UndefinedCondition) {
494 	    reason = "Decompression failure";
495 	    Debug(this,DebugNote,"Decompressor failure [%p]",this);
496 	}
497 	else if (error == XMPPError::Internal) {
498 	    reason = "Decompression failure";
499 	    Debug(this,DebugNote,"No decompressor [%p]",this);
500 	}
501 	else {
502 	    reason = "Parser error 'XML element too long'";
503 	    Debug(this,DebugNote,"Parser overflow len=%u max= %u [%p]",
504 		m_xmlDom->buffer().length(),m_engine->m_maxIncompleteXml,this);
505 	}
506     }
507     else if (read) {
508 	String tmp;
509 	Thread::errorString(tmp,m_socket->error());
510 	reason << "Socket read error: " << tmp << " (" << m_socket->error() << ")";
511 	Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
512     }
513     else {
514 	reason = "Stream EOF";
515 	Debug(this,DebugInfo,"%s [%p]",reason.c_str(),this);
516 	location = 1;
517     }
518     socketSetCanRead(false);
519     lck.drop();
520     postponeTerminate(location,m_incoming,error,reason);
521     return read > 0;
522 }
523 
524 // Stream state processor
getEvent(u_int64_t time)525 JBEvent* JBStream::getEvent(u_int64_t time)
526 {
527     if (m_lastEvent)
528 	return 0;
529     Lock lock(this);
530     if (m_lastEvent)
531 	return 0;
532     XDebug(this,DebugAll,"JBStream::getEvent() [%p]",this);
533     checkPendingEvent();
534     if (!m_lastEvent) {
535 	if (canProcess(time)) {
536 	    process(time);
537 	    checkPendingEvent();
538 	    if (!m_lastEvent)
539 		checkTimeouts(time);
540 	}
541 	else
542 	    checkPendingEvent();
543     }
544 #ifdef XDEBUG
545     if (m_lastEvent)
546 	Debug(this,DebugAll,"Generating event (%p,%s) in state '%s' [%p]",
547 	    m_lastEvent,m_lastEvent->name(),stateName(),this);
548 #endif
549     return m_lastEvent;
550 }
551 
552 // Send a stanza ('iq', 'message' or 'presence') or dialback elements in Running state.
sendStanza(XmlElement * & xml)553 bool JBStream::sendStanza(XmlElement*& xml)
554 {
555     if (!xml)
556 	return false;
557     DDebug(this,DebugAll,"sendStanza(%p) '%s' [%p]",xml,xml->tag(),this);
558     if (!(XMPPUtils::isStanza(*xml) ||
559 	(m_type == s2s && XMPPUtils::hasXmlns(*xml,XMPPNamespace::Dialback)))) {
560 	Debug(this,DebugNote,"Request to send non stanza xml='%s' [%p]",xml->tag(),this);
561 	TelEngine::destruct(xml);
562 	return false;
563     }
564     XmlElementOut* xo = new XmlElementOut(xml);
565     xml = 0;
566     xo->prepareToSend();
567     Lock lock(this);
568     m_pending.append(xo);
569     sendPending();
570     return true;
571 }
572 
573 // Send stream related XML when negotiating the stream
574 //  or some other stanza in non Running state
sendStreamXml(State newState,XmlElement * first,XmlElement * second,XmlElement * third)575 bool JBStream::sendStreamXml(State newState, XmlElement* first, XmlElement* second,
576     XmlElement* third)
577 {
578     DDebug(this,DebugAll,"sendStreamXml(%s,%p,%p,%p) [%p]",
579 	stateName(),first,second,third,this);
580     Lock lock(this);
581     bool ok = false;
582     XmlFragment frag;
583     // Use a do while() to break to the end: safe cleanup
584     do {
585 	if (m_state == Idle || m_state == Destroy)
586 	    break;
587 	// Check if we have unsent stream xml
588 	if (m_outStreamXml)
589 	    sendPending(true);
590 	if (m_outStreamXml)
591 	    break;
592 	if (!first)
593 	    break;
594 	// Add stream declaration before stream start
595 	if (first->getTag() == XMPPUtils::s_tag[XmlTag::Stream] &&
596 	    first->tag()[0] != '/') {
597 	    XmlDeclaration* decl = new XmlDeclaration;
598 	    decl->toString(m_outStreamXml,true);
599 	    frag.addChild(decl);
600 	}
601 	first->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
602 	frag.addChild(first);
603 	if (second) {
604 	    second->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
605 	    frag.addChild(second);
606 	    if (third) {
607 		third->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
608 		frag.addChild(third);
609 	    }
610 	}
611 	first = second = third = 0;
612 	if (flag(StreamCompressed) && !compress()) {
613 	    ok = false;
614 	    break;
615 	}
616 	m_engine->printXml(this,true,frag);
617 	ok = sendPending(true);
618     } while (false);
619     TelEngine::destruct(first);
620     TelEngine::destruct(second);
621     TelEngine::destruct(third);
622     if (ok)
623 	changeState(newState);
624     return ok;
625 }
626 
627 // Start the stream. This method should be called by the upper layer
628 //  when processing an incoming stream Start event
start(XMPPFeatureList * features,XmlElement * caps,bool useVer1)629 void JBStream::start(XMPPFeatureList* features, XmlElement* caps, bool useVer1)
630 {
631     Lock lock(this);
632     if (m_state != Starting)
633 	return;
634     if (outgoing()) {
635 	TelEngine::destruct(features);
636 	TelEngine::destruct(caps);
637 	if (m_type == c2s) {
638 	    // c2s: just wait for stream features
639 	    changeState(Features);
640 	}
641 	else if (m_type == s2s) {
642 	    // Wait features ?
643 	    if (flag(StreamRemoteVer1)) {
644 		changeState(Features);
645 		return;
646 	    }
647 	    // Stream not secured
648 	    if (!flag(StreamSecured)) {
649 		// Accept dialback auth stream
650 		// The namspace presence was already checked in checkStreamStart()
651 		if (flag(TlsRequired)) {
652 		    terminate(0,false,0,XMPPError::EncryptionRequired);
653 		    return;
654 		}
655 	    }
656 	    setFlags(StreamSecured);
657 	    serverStream()->sendDialback();
658 	}
659 	else if (m_type == cluster)
660 	    changeState(Features);
661 	else if (m_type == comp)
662 	    serverStream()->startComp();
663 	else
664 	    DDebug(this,DebugStub,"JBStream::start() not handled for type=%s",typeName());
665 	return;
666     }
667     m_features.clear();
668     if (features)
669 	m_features.add(*features);
670     if (useVer1 && flag(StreamRemoteVer1))
671 	setFlags(StreamLocalVer1);
672     if (flag(StreamRemoteVer1) && flag(StreamLocalVer1)) {
673 	// Set secured flag if we don't advertise TLS
674 	if (!(flag(StreamSecured) || m_features.get(XMPPNamespace::Tls)))
675 	    setSecured();
676 	// Set authenticated flag if we don't advertise authentication mechanisms
677 	if (flag(StreamSecured)) {
678 	    if (flag(StreamAuthenticated))
679 	        m_features.remove(XMPPNamespace::Sasl);
680 	    else if (!m_features.get(XMPPNamespace::Sasl))
681 		setFlags(StreamAuthenticated);
682 	}
683     }
684     else
685 	// c2s using non-sasl auth or s2s not using TLS
686 	setSecured();
687     // Send start and features
688     XmlElement* s = buildStreamStart();
689     XmlElement* f = 0;
690     if (flag(StreamRemoteVer1) && flag(StreamLocalVer1))
691 	f = m_features.buildStreamFeatures();
692     if (f && caps)
693 	f->addChild(caps);
694     else
695 	TelEngine::destruct(caps);
696     State newState = Features;
697     if (m_type == c2s) {
698 	// Change stream state to Running if authenticated and there is no required
699 	// feature to negotiate
700 	if (flag(StreamAuthenticated) && !firstRequiredFeature())
701 	    newState = Running;
702     }
703     else if (m_type == s2s) {
704 	// Change stream state to Running if authenticated and features list is empty
705 	if (flag(StreamAuthenticated) && !m_features.skipNull())
706 	    newState = Running;
707     }
708     else if (m_type == cluster) {
709 	// Change stream state to Running if authenticated and features list is empty
710 	if (flag(StreamAuthenticated) && !m_features.skipNull())
711 	    newState = Running;
712     }
713     sendStreamXml(newState,s,f);
714 }
715 
716 // Authenticate an incoming stream
authenticated(bool ok,const String & rsp,XMPPError::Type error,const char * username,const char * id,const char * resource)717 bool JBStream::authenticated(bool ok, const String& rsp, XMPPError::Type error,
718     const char* username, const char* id, const char* resource)
719 {
720     Lock lock(this);
721     if (m_state != Auth || !incoming())
722 	return false;
723     DDebug(this,DebugAll,"authenticated(%s,'%s',%s) local=%s [%p]",
724 	String::boolText(ok),rsp.safe(),XMPPUtils::s_error[error].c_str(),
725 	m_local.c_str(),this);
726     if (ok) {
727 	if (m_type == c2s) {
728 	    if (m_sasl) {
729 		// Set remote party node if provided
730 		if (!TelEngine::null(username)) {
731 		    m_remote.set(username,m_local.domain(),"");
732 		    Debug(this,DebugAll,"Remote party set to '%s' [%p]",m_remote.c_str(),this);
733 		}
734 		String text;
735 		m_sasl->buildAuthRspReply(text,rsp);
736 		XmlElement* s = XMPPUtils::createElement(XmlTag::Success,
737 		    XMPPNamespace::Sasl,text);
738 		ok = sendStreamXml(WaitStart,s);
739 	    }
740 	    else if (m_features.get(XMPPNamespace::IqAuth)) {
741 		// Set remote party if not provided
742 		if (!TelEngine::null(username))
743 		    m_remote.set(username,m_local.domain(),resource);
744 		else
745 		    m_remote.resource(resource);
746 		if (m_remote.isFull()) {
747 		    Debug(this,DebugAll,"Remote party set to '%s' [%p]",m_remote.c_str(),this);
748 		    XmlElement* rsp = XMPPUtils::createIqResult(0,0,id,
749 			XMPPUtils::createElement(XmlTag::Query,XMPPNamespace::IqAuth));
750 		    ok = sendStreamXml(Running,rsp);
751 		    if (!ok)
752 			m_remote.set(m_local.domain());
753 		}
754 		else
755 		    terminate(0,true,0,XMPPError::Internal);
756 	    }
757 	    else
758 		terminate(0,true,0,XMPPError::Internal);
759 	}
760 	else if (m_type == s2s)
761 	    ok = false;
762 	else if (m_type == comp) {
763 	    XmlElement* rsp = XMPPUtils::createElement(XmlTag::Handshake);
764 	    ok = sendStreamXml(Running,rsp);
765 	}
766 	if (ok) {
767 	    m_features.remove(XMPPNamespace::Sasl);
768 	    m_features.remove(XMPPNamespace::IqAuth);
769 	    setFlags(StreamAuthenticated);
770 	}
771     }
772     else {
773 	if (m_type == c2s) {
774 	    XmlElement* rsp = 0;
775 	    if (m_sasl)
776 		rsp = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
777 	    else {
778 		rsp = XMPPUtils::createIq(XMPPUtils::IqError,0,0,id);
779 		if (TelEngine::null(id))
780 		    rsp->addChild(XMPPUtils::createElement(XmlTag::Query,XMPPNamespace::IqAuth));
781 		rsp->addChild(XMPPUtils::createError(XMPPError::TypeAuth,error));
782 	    }
783 	    ok = sendStreamXml(Features,rsp);
784 	}
785 	else if (m_type == s2s)
786 	    ok = false;
787 	else if (m_type == comp)
788 	    terminate(0,true,0,XMPPError::NotAuthorized);
789     }
790     TelEngine::destruct(m_sasl);
791     return ok;
792 }
793 
794 // Terminate the stream. Send stream end tag or error.
795 // Reset the stream. Deref stream if destroying.
terminate(int location,bool destroy,XmlElement * xml,int error,const char * reason,bool final,bool genEvent,const char * content)796 void JBStream::terminate(int location, bool destroy, XmlElement* xml, int error,
797     const char* reason, bool final, bool genEvent, const char* content)
798 {
799     XDebug(this,DebugAll,"terminate(%d,%u,%p,%u,%s,%u) state=%s [%p]",
800 	location,destroy,xml,error,reason,final,stateName(),this);
801     Lock lock(this);
802     m_pending.clear();
803     m_outXmlCompress.clear();
804     resetPostponedTerminate();
805     // Already in destroy
806     if (state() == Destroy) {
807 	TelEngine::destruct(xml);
808 	return;
809     }
810     bool sendEndTag = true;
811     destroy = destroy || final || flag(NoAutoRestart);
812     // Set error flag
813     if (state() == Running) {
814 	if (error != XMPPError::NoError)
815 	    setFlags(InError);
816 	else
817 	    resetFlags(InError);
818     }
819     else
820 	setFlags(InError);
821     if (flag(InError)) {
822 	// Reset re-connect counter if not internal policy error
823 	if (location || error != XMPPError::Policy)
824 	    m_restart = 0;
825     }
826     if (error == XMPPError::NoError && m_engine->exiting())
827 	error = XMPPError::Shutdown;
828     // Last check for sendEndTag
829     if (sendEndTag) {
830 	// Prohibitted states or socket read/write error
831 	if (m_state == Destroy || m_state == Securing || m_state == Connecting)
832 	    sendEndTag = false;
833 	else if (error == XMPPError::SocketError) {
834 	    sendEndTag = false;
835 	    reason = "I/O error";
836 	}
837     }
838     Debug(this,DebugAll,
839 	"Terminate by '%s' state=%s destroy=%u error=%s reason='%s' final=%u [%p]",
840 	lookup(location,s_location),stateName(),destroy,
841 	XMPPUtils::s_error[error].c_str(),reason,final,this);
842     if (sendEndTag) {
843 	XmlElement* start = 0;
844 	if (m_state == Starting && incoming())
845 	    start = buildStreamStart();
846 	XmlElement* end = new XmlElement(String("/stream:stream"),false);
847 	if (error != XMPPError::NoError && location < 1) {
848 	    XmlElement* e = XMPPUtils::createStreamError(error,reason,content);
849 	    if (!start)
850 		sendStreamXml(m_state,e,end);
851 	    else
852 		sendStreamXml(m_state,start,e,end);
853 	}
854 	else {
855 	    if (!start)
856 		sendStreamXml(m_state,end);
857 	    else
858 		sendStreamXml(m_state,start,end);
859 	}
860     }
861     resetConnection();
862     m_outStreamXml.clear();
863     m_outStreamXmlCompress.clear();
864 
865     // Always set termination event, except when called from destructor
866     if (genEvent && !(final || m_terminateEvent)) {
867 	// TODO: Cancel all outgoing elements without id
868 	m_terminateEvent = new JBEvent(destroy ? JBEvent::Destroy : JBEvent::Terminated,
869 	    this,xml);
870 	xml = 0;
871 	if (!m_terminateEvent->m_text) {
872 	    if (TelEngine::null(reason))
873 		m_terminateEvent->m_text = XMPPUtils::s_error[error];
874 	    else
875 		m_terminateEvent->m_text = reason;
876 	}
877     }
878     TelEngine::destruct(xml);
879 
880     changeState(destroy ? Destroy : Idle);
881 }
882 
883 // Close the stream. Release memory
destroyed()884 void JBStream::destroyed()
885 {
886     terminate(0,true,0,XMPPError::NoError,"",true);
887     resetConnection();
888     if (m_engine)
889 	m_engine->removeStream(this,false);
890     TelEngine::destruct(m_terminateEvent);
891     DDebug(this,DebugAll,"Destroyed local=%s remote=%s [%p]",
892 	m_local.safe(),m_remote.safe(),this);
893     RefObject::destroyed();
894 }
895 
896 // Check if stream state processor can continue
897 // This method is called from getEvent() with the stream locked
canProcess(u_int64_t time)898 bool JBStream::canProcess(u_int64_t time)
899 {
900     if (outgoing()) {
901 	// Increase stream restart counter if it's time to and should auto restart
902 	if (!flag(NoAutoRestart) && m_timeToFillRestart < time) {
903 	    m_timeToFillRestart = time + m_engine->m_restartUpdInterval;
904 	    if (m_restart < m_engine->m_restartMax) {
905 		m_restart++;
906 		DDebug(this,DebugAll,"Restart count set to %u max=%u [%p]",
907 		    m_restart,m_engine->m_restartMax,this);
908 	    }
909 	}
910 	if (state() == Idle) {
911 	    // Re-connect
912 	    bool conn = (m_connectStatus > JBConnect::Start);
913 	    if (!conn && m_restart) {
914 		// Don't connect non client/component or cluster if we are in error and
915 		//  have nothing to send
916 		if (m_type != c2s && m_type != comp && m_type != cluster &&
917 		    flag(InError) && !m_pending.skipNull())
918 		    return false;
919 		conn = true;
920 		m_restart--;
921 	    }
922 	    if (conn) {
923 		resetFlags(InError);
924 		changeState(Connecting);
925 		m_engine->connectStream(this);
926 		return false;
927 	    }
928 	    // Destroy if not auto-restarting
929 	    if (flag(NoAutoRestart)) {
930 		terminate(0,true,0);
931 		return false;
932 	    }
933 	}
934     }
935     else if (state() == Idle && flag(NoAutoRestart)) {
936 	terminate(0,true,0);
937 	return false;
938     }
939     return true;
940 }
941 
942 // Process stream state. Get XML from parser's queue and process it
943 // This method is called from getEvent() with the stream locked
process(u_int64_t time)944 void JBStream::process(u_int64_t time)
945 {
946     if (!m_xmlDom)
947 	return;
948     XDebug(this,DebugAll,"JBStream::process() [%p]",this);
949     while (true) {
950 	sendPending();
951 	if (m_terminateEvent)
952 	    break;
953 
954 	// Lock the parser to obtain the root and/or child
955 	// Unlock it before processing received element
956 	Lock lockDoc(m_socketMutex);
957 	XmlDocument* doc = m_xmlDom ? m_xmlDom->document() : 0;
958 	XmlElement* root = doc ? doc->root(false) : 0;
959 	if (!root)
960 	    break;
961 
962 	if (m_state == WaitStart) {
963 	    // Print the declaration
964 	    XmlDeclaration* dec = doc->declaration();
965 	    if (dec)
966 		m_engine->printXml(this,false,*dec);
967 	    XmlElement xml(*root);
968 	    lockDoc.drop();
969 	    // Print the root. Make sure we don't print its children
970 	    xml.clearChildren();
971 	    m_engine->printXml(this,false,xml);
972 	    // Check if valid
973 	    if (!XMPPUtils::isTag(xml,XmlTag::Stream,XMPPNamespace::Stream)) {
974 		String* ns = xml.xmlns();
975 		Debug(this,DebugMild,"Received invalid stream root '%s' namespace='%s' [%p]",
976 		    xml.tag(),TelEngine::c_safe(ns),this);
977 		terminate(0,true,0);
978 		break;
979 	    }
980 	    // Check 'from' and 'to'
981 	    JabberID from;
982 	    JabberID to;
983 	    if (!getJids(&xml,from,to))
984 		break;
985 	    DDebug(this,DebugAll,"Processing root '%s' in state %s [%p]",
986 		xml.tag(),stateName(),this);
987 	    processStart(&xml,from,to);
988 	    break;
989 	}
990 
991 	XmlElement* xml = root->pop();
992 	if (!xml) {
993 	    if (root->completed())
994 		socketSetCanRead(false);
995 	    if (m_events.skipNull())
996 	        break;
997 	    if (!root->completed()) {
998 		if (m_ppTerminate && !(m_pending.skipNull() && socketCanWrite())) {
999 		    lockDoc.drop();
1000 		    postponedTerminate();
1001 		}
1002 		break;
1003 	    }
1004 	    DDebug(this,DebugAll,"Remote closed the stream in state %s [%p]",
1005 		stateName(),this);
1006 	    lockDoc.drop();
1007 	    resetPostponedTerminate();
1008 	    terminate(1,false,0);
1009 	    break;
1010 	}
1011 	lockDoc.drop();
1012 
1013 	// Process received element
1014 	// Print it
1015 	m_engine->printXml(this,false,*xml);
1016 	// Check stream termination
1017 	if (streamError(xml))
1018 	    break;
1019 	// Check 'from' and 'to'
1020 	JabberID from;
1021 	JabberID to;
1022 	if (!getJids(xml,from,to))
1023 	    break;
1024 	// Restart the idle timer
1025 	setIdleTimer(time);
1026 	// Check if a received stanza is valid and allowed in current state
1027 	if (!checkStanzaRecv(xml,from,to))
1028 	    break;
1029 
1030 	DDebug(this,DebugAll,"Processing (%p,%s) in state %s [%p]",
1031 	    xml,xml->tag(),stateName(),this);
1032 
1033 	// Process here dialback verify
1034 	if (m_type == s2s && isDbVerify(*xml)) {
1035 	    switch (state()) {
1036 		case Running:
1037 		case Features:
1038 		case Starting:
1039 		case Challenge:
1040 		case Auth:
1041 		    m_events.append(new JBEvent(JBEvent::DbVerify,this,xml,from,to));
1042 		    break;
1043 		default:
1044 		    dropXml(xml,"dialback verify in unsupported state");
1045 	    }
1046 	    continue;
1047 	}
1048 
1049 	switch (m_state) {
1050 	    case Running:
1051 		processRunning(xml,from,to);
1052 		// Reset ping
1053 		setNextPing(true);
1054 		m_pingId = "";
1055 		break;
1056 	    case Features:
1057 		if (m_incoming)
1058 		    processFeaturesIn(xml,from,to);
1059 		else
1060 		    processFeaturesOut(xml,from,to);
1061 		break;
1062 	    case WaitStart:
1063 	    case Starting:
1064 		processStart(xml,from,to);
1065 		TelEngine::destruct(xml);
1066 		break;
1067 	    case Challenge:
1068 		processChallenge(xml,from,to);
1069 		break;
1070 	    case Auth:
1071 		processAuth(xml,from,to);
1072 		break;
1073 	    case WaitTlsRsp:
1074 		processWaitTlsRsp(xml,from,to);
1075 		break;
1076 	    case Register:
1077 		processRegister(xml,from,to);
1078 		break;
1079 	    case Compressing:
1080 		processCompressing(xml,from,to);
1081 		break;
1082 	    default:
1083 		dropXml(xml,"unhandled stream state in process()");
1084 	}
1085 	break;
1086     }
1087     XDebug(this,DebugAll,"JBStream::process() exiting [%p]",this);
1088 }
1089 
1090 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)1091 bool JBStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
1092 {
1093     if (!xml)
1094 	return true;
1095     int t, ns;
1096     if (!XMPPUtils::getTag(*xml,t,ns))
1097 	return dropXml(xml,"failed to retrieve element tag");
1098     switch (t) {
1099 	case XmlTag::Message:
1100 	    if (ns != m_xmlns)
1101 		break;
1102 	    m_events.append(new JBEvent(JBEvent::Message,this,xml,from,to));
1103 	    return true;
1104 	case XmlTag::Presence:
1105 	    if (ns != m_xmlns)
1106 		break;
1107 	    m_events.append(new JBEvent(JBEvent::Presence,this,xml,from,to));
1108 	    return true;
1109 	case XmlTag::Iq:
1110 	    if (ns != m_xmlns)
1111 		break;
1112 	    checkPing(this,xml,m_pingId);
1113 	    m_events.append(new JBEvent(JBEvent::Iq,this,xml,from,to,xml->findFirstChild()));
1114 	    return true;
1115 	default:
1116 	    m_events.append(new JBEvent(JBEvent::Unknown,this,xml,from,to));
1117 	    return true;
1118     }
1119     // Invalid stanza namespace
1120     XmlElement* rsp = XMPPUtils::createError(xml,XMPPError::TypeModify,
1121 	XMPPError::InvalidNamespace,"Only stanzas in default namespace are allowed");
1122     sendStanza(rsp);
1123     return true;
1124 }
1125 
1126 // Check stream timeouts
1127 // This method is called from getEvent() with the stream locked, after
checkTimeouts(u_int64_t time)1128 void JBStream::checkTimeouts(u_int64_t time)
1129 {
1130     if (m_ppTerminateTimeout && m_ppTerminateTimeout <= time) {
1131 	m_ppTerminateTimeout = 0;
1132 	Debug(this,DebugAll,"Postponed termination timed out [%p]",this);
1133 	if (postponedTerminate())
1134 	    return;
1135     }
1136     // Running: check ping and idle timers
1137     if (m_state == Running) {
1138 	const char* reason = 0;
1139 	if (m_pingTimeout) {
1140 	    if (m_pingTimeout < time) {
1141 		Debug(this,DebugNote,"Ping stanza with id '%s' timed out [%p]",m_pingId.c_str(),this);
1142 		reason = "Ping timeout";
1143 	    }
1144 	}
1145 	else if (m_nextPing && time >= m_nextPing) {
1146 	    XmlElement* ping = setNextPing(false);
1147 	    if (ping) {
1148 		DDebug(this,DebugAll,"Sending ping with id=%s [%p]",m_pingId.c_str(),this);
1149 		if (!sendStanza(ping))
1150 		    m_pingId = "";
1151 	    }
1152 	    else {
1153 		resetPing();
1154 		m_pingId = "";
1155 	    }
1156 	}
1157 	if (m_idleTimeout && m_idleTimeout < time && !reason)
1158 	    reason = "Stream idle";
1159 	if (reason)
1160 	    terminate(0,m_incoming,0,XMPPError::ConnTimeout,reason);
1161 	return;
1162     }
1163     // Stream setup timer
1164     if (m_setupTimeout && m_setupTimeout < time) {
1165 	terminate(0,m_incoming,0,XMPPError::Policy,"Stream setup timeout");
1166 	return;
1167     }
1168     // Stream start timer
1169     if (m_startTimeout && m_startTimeout < time) {
1170 	terminate(0,m_incoming,0,XMPPError::Policy,"Stream start timeout");
1171 	return;
1172     }
1173     // Stream connect timer
1174     if (m_connectTimeout && m_connectTimeout < time) {
1175 	DDebug(this,DebugNote,"Connect timed out stat=%s [%p]",
1176 	    lookup(m_connectStatus,JBConnect::s_statusName),this);
1177 	// Don't terminate if there are more connect options
1178 	if (state() == Connecting && m_connectStatus > JBConnect::Start) {
1179 	    m_engine->stopConnect(toString());
1180 	    m_engine->connectStream(this);
1181 	}
1182 	else
1183 	    terminate(0,m_incoming,0,XMPPError::ConnTimeout,"Stream connect timeout");
1184 	return;
1185     }
1186 }
1187 
1188 // Reset the stream's connection. Build a new XML parser if the socket is valid
resetConnection(Socket * sock)1189 void JBStream::resetConnection(Socket* sock)
1190 {
1191     DDebug(this,DebugAll,"JBStream::resetConnection(%p) current=%p [%p]",
1192 	sock,m_socket,this);
1193     // Release the old one
1194     if (m_socket) {
1195 	m_socketMutex.lock();
1196 	m_socketFlags |= SocketWaitReset;
1197 	m_socketMutex.unlock();
1198 	// Wait for the socket to become available (not reading or writing)
1199 	Socket* tmp = 0;
1200 	while (true) {
1201 	    Lock lock(m_socketMutex);
1202 	    if (!(m_socket && (socketReading() || socketWriting()))) {
1203 		tmp = m_socket;
1204 		m_socket = 0;
1205 		m_socketFlags = 0;
1206 		if (m_xmlDom) {
1207 		    delete m_xmlDom;
1208 		    m_xmlDom = 0;
1209 		}
1210 		TelEngine::destruct(m_compress);
1211 		break;
1212 	    }
1213 	    lock.drop();
1214 	    Thread::yield(false);
1215 	}
1216 	if (tmp) {
1217 	    tmp->setLinger(-1);
1218 	    tmp->terminate();
1219 	    delete tmp;
1220 	}
1221     }
1222     resetPostponedTerminate();
1223     if (sock) {
1224 	Lock lock(m_socketMutex);
1225 	if (m_socket) {
1226 	    Debug(this,DebugWarn,"Duplicate attempt to set socket! [%p]",this);
1227 	    delete sock;
1228 	    return;
1229 	}
1230 	m_xmlDom = new XmlDomParser(debugName());
1231 	m_xmlDom->debugChain(this);
1232 	m_socket = sock;
1233 	if (debugAt(DebugAll)) {
1234 	    SocketAddr l, r;
1235 	    localAddr(l);
1236 	    remoteAddr(r);
1237 	    Debug(this,DebugAll,"Connection set local=%s:%d remote=%s:%d sock=%p [%p]",
1238 		l.host().c_str(),l.port(),r.host().c_str(),r.port(),m_socket,this);
1239 	}
1240 	m_socket->setReuse(true);
1241 	m_socket->setBlocking(false);
1242 	socketSetCanRead(true);
1243 	socketSetCanWrite(true);
1244     }
1245 }
1246 
1247 // Build a ping iq stanza
buildPing(const String & stanzaId)1248 XmlElement* JBStream::buildPing(const String& stanzaId)
1249 {
1250     return 0;
1251 }
1252 
1253 // Build a stream start XML element
buildStreamStart()1254 XmlElement* JBStream::buildStreamStart()
1255 {
1256     XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
1257     if (incoming())
1258 	start->setAttribute("id",m_id);
1259     XMPPUtils::setStreamXmlns(*start);
1260     start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
1261     start->setAttributeValid("from",m_local.bare());
1262     start->setAttributeValid("to",m_remote.bare());
1263     if (outgoing() || flag(StreamRemoteVer1))
1264 	start->setAttribute("version","1.0");
1265     start->setAttribute("xml:lang","en");
1266     return start;
1267 }
1268 
1269 // Process received elements in WaitStart state
1270 // WaitStart: Incoming: waiting for stream start
1271 //            Outgoing: idem (our stream start was already sent)
1272 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)1273 bool JBStream::processStart(const XmlElement* xml, const JabberID& from,
1274     const JabberID& to)
1275 {
1276     Debug(this,DebugStub,"JBStream::processStart(%s) [%p]",xml->tag(),this);
1277     return true;
1278 }
1279 
1280 // Process elements in Compressing state
1281 // Return false if stream termination was initiated
processCompressing(XmlElement * xml,const JabberID & from,const JabberID & to)1282 bool JBStream::processCompressing(XmlElement* xml, const JabberID& from,
1283     const JabberID& to)
1284 {
1285     XDebug(this,DebugAll,"JBStream::processCompressing() [%p]",this);
1286     int t = XmlTag::Count;
1287     int n = XMPPNamespace::Count;
1288     XMPPUtils::getTag(*xml,t,n);
1289     if (outgoing()) {
1290 	if (n != XMPPNamespace::Compress)
1291 	    return dropXml(xml,"expecting compression namespace");
1292 	// Expecting: compressed/failure
1293 	bool ok = (t == XmlTag::Compressed);
1294 	if (!ok && t != XmlTag::Failure)
1295 	    return dropXml(xml,"expecting compress response (compressed/failure)");
1296 	if (ok) {
1297 	    if (m_compress)
1298 		setFlags(StreamCompressed);
1299 	    else
1300 		return destroyDropXml(xml,XMPPError::Internal,"no compressor");
1301 	}
1302 	else {
1303 	    XmlElement* ch = xml->findFirstChild();
1304 	    Debug(this,DebugInfo,"Compress failed at remote party error=%s [%p]",
1305 		ch ? ch->tag() : "",this);
1306 	    TelEngine::destruct(m_compress);
1307 	}
1308 	TelEngine::destruct(xml);
1309 	// Restart the stream on success
1310 	if (ok) {
1311 	    XmlElement* s = buildStreamStart();
1312 	    return sendStreamXml(WaitStart,s);
1313 	}
1314 	// Compress failed: continue
1315 	JBServerStream* server = serverStream();
1316 	if (server)
1317 	    return server->sendDialback();
1318 	JBClientStream* client = clientStream();
1319 	if (client)
1320 	    return client->bind();
1321 	Debug(this,DebugNote,"Unhandled stream type in %s state [%p]",stateName(),this);
1322 	terminate(0,true,0,XMPPError::Internal);
1323 	return true;
1324     }
1325     // Authenticated incoming s2s waiting for compression or any other element
1326     if (type() == s2s && m_features.get(XMPPNamespace::CompressFeature)) {
1327 	if (t == XmlTag::Compress && n == XMPPNamespace::Compress)
1328 	    return handleCompressReq(xml);
1329 	// Change state to Running
1330 	changeState(Running);
1331 	return processRunning(xml,from,to);
1332     }
1333 
1334     return dropXml(xml,"not implemented");
1335 }
1336 
1337 // Process elements in Register state
processRegister(XmlElement * xml,const JabberID & from,const JabberID & to)1338 bool JBStream::processRegister(XmlElement* xml, const JabberID& from,
1339     const JabberID& to)
1340 {
1341     dropXml(xml,"can't process in this state");
1342     terminate(0,true,0,XMPPError::Internal);
1343     return false;
1344 }
1345 
1346 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)1347 bool JBStream::processAuth(XmlElement* xml, const JabberID& from,
1348     const JabberID& to)
1349 {
1350     return dropXml(xml,"can't process in this state");
1351 }
1352 
1353 // Check if a received start start element's namespaces are correct.
processStreamStart(const XmlElement * xml)1354 bool JBStream::processStreamStart(const XmlElement* xml)
1355 {
1356     XDebug(this,DebugAll,"JBStream::processStreamStart() [%p]",this);
1357     if (m_state == Starting)
1358 	return true;
1359     changeState(Starting);
1360     if (!XMPPUtils::hasDefaultXmlns(*xml,m_xmlns)) {
1361 	Debug(this,DebugNote,"Received '%s' with invalid xmlns='%s' [%p]",
1362 	    xml->tag(),TelEngine::c_safe(xml->xmlns()),this);
1363 	terminate(0,m_incoming,0,XMPPError::InvalidNamespace);
1364 	return false;
1365     }
1366     XMPPError::Type error = XMPPError::NoError;
1367     const char* reason = 0;
1368     while (true) {
1369 	if (m_type != c2s && m_type != s2s && m_type != comp && m_type != cluster) {
1370 	    Debug(this,DebugStub,"processStreamStart() type %u not handled!",m_type);
1371 	    error = XMPPError::Internal;
1372 	    break;
1373 	}
1374 	// Check xmlns:stream
1375 	String* nsStr = xml->getAttribute("xmlns:stream");
1376 	if (!nsStr || *nsStr != XMPPUtils::s_ns[XMPPNamespace::Stream]) {
1377 	    Debug(this,DebugNote,"Received '%s' with invalid xmlns:stream='%s' [%p]",
1378 		xml->tag(),TelEngine::c_safe(nsStr),this);
1379 	    error = XMPPError::InvalidNamespace;
1380 	    break;
1381 	}
1382 	// Check version
1383 	String ver(xml->getAttribute("version"));
1384 	int remoteVersion = -1;
1385 	if (ver) {
1386 	    int pos = ver.find('.');
1387 	    if (pos > 0)
1388 		remoteVersion = ver.substr(0,pos).toInteger(-1);
1389 	}
1390 	if (remoteVersion == 1)
1391 	    setFlags(StreamRemoteVer1);
1392 	else if (remoteVersion < 1) {
1393 	    if (m_type == c2s)
1394 		XDebug(this,DebugAll,"c2s stream start with version < 1 [%p]",this);
1395 	    else if (m_type == s2s) {
1396 		// Accept invalid/unsupported version only if TLS is not required
1397 		if (!flag(TlsRequired)) {
1398 		    // Check dialback
1399 		    if (!xml->hasAttribute("xmlns:db",XMPPUtils::s_ns[XMPPNamespace::Dialback]))
1400 			error = XMPPError::InvalidNamespace;
1401 		}
1402 		else
1403 		    error = XMPPError::EncryptionRequired;
1404 	    }
1405 	    else if (m_type != comp)
1406 		error = XMPPError::Internal;
1407 	}
1408 	else if (remoteVersion > 1)
1409 	    error = XMPPError::UnsupportedVersion;
1410 	if (error != XMPPError::NoError) {
1411 	    Debug(this,DebugNote,"Unacceptable '%s' version='%s' error=%s [%p]",
1412 		xml->tag(),ver.c_str(),XMPPUtils::s_error[error].c_str(),this);
1413 	    break;
1414 	}
1415 	// Set stream id: generate one for incoming, get it from xml if outgoing
1416 	if (incoming()) {
1417 	    // Generate a random, variable length stream id
1418 	    MD5 md5(String((int)(int64_t)this));
1419 	    md5 << m_name << String((int)Time::msecNow());
1420 	    m_id = md5.hexDigest();
1421 	    m_id << "_" << String((int)Random::random());
1422 	}
1423 	else {
1424 	    m_id = xml->getAttribute("id");
1425 	    if (!m_id) {
1426 		Debug(this,DebugNote,"Received '%s' with empty stream id [%p]",
1427 		    xml->tag(),this);
1428 		reason = "Missing stream id";
1429 		error = XMPPError::InvalidId;
1430 		break;
1431 	    }
1432 	}
1433 	XDebug(this,DebugAll,"Stream id set to '%s' [%p]",m_id.c_str(),this);
1434 	break;
1435     }
1436     if (error == XMPPError::NoError)
1437 	return true;
1438     terminate(0,m_incoming,0,error,reason);
1439     return false;
1440 }
1441 
1442 // Handle an already checked (tag and namespace) compress request
1443 // Respond to it. Change stream state on success
handleCompressReq(XmlElement * xml)1444 bool JBStream::handleCompressReq(XmlElement* xml)
1445 {
1446     XMPPError::Type error = XMPPError::UnsupportedMethod;
1447     State newState = state();
1448     XmlElement* rsp = 0;
1449     XmlElement* m = XMPPUtils::findFirstChild(*xml,XmlTag::Method,
1450 	XMPPNamespace::Compress);
1451     if (m) {
1452 	// Get and check the method
1453 	const String& method = m->getText();
1454 	XMPPFeatureCompress* c = m_features.getCompress();
1455 	if (method && c && c->hasMethod(method)) {
1456 	    // Build the (de)compressor
1457 	    Lock lock(m_socketMutex);
1458 	    m_engine->compressStream(this,method);
1459 	    if (m_compress) {
1460 		newState = WaitStart;
1461 		setFlags(SetCompressed);
1462 		m_features.remove(XMPPNamespace::CompressFeature);
1463 		rsp = XMPPUtils::createElement(XmlTag::Compressed,XMPPNamespace::Compress);
1464 	    }
1465 	    else
1466 		error = XMPPError::SetupFailed;
1467 	}
1468     }
1469     TelEngine::destruct(xml);
1470     if (!rsp)
1471 	rsp = XMPPUtils::createFailure(XMPPNamespace::Compress,error);
1472     return sendStreamXml(newState,rsp);
1473 }
1474 
1475 // Check if a received element is a stream error one
streamError(XmlElement * xml)1476 bool JBStream::streamError(XmlElement* xml)
1477 {
1478     if (!(xml && XMPPUtils::isTag(*xml,XmlTag::Error,XMPPNamespace::Stream)))
1479 	return false;
1480     String text;
1481     String error;
1482     String content;
1483     XMPPUtils::decodeError(xml,XMPPNamespace::StreamError,&error,&text,&content);
1484     Debug(this,DebugAll,"Received stream error '%s' content='%s' text='%s' in state %s [%p]",
1485 	error.c_str(),content.c_str(),text.c_str(),stateName(),this);
1486     int err = XMPPUtils::s_error[error];
1487     if (err >= XMPPError::Count)
1488 	err = XMPPError::NoError;
1489     String rAddr;
1490     int rPort = 0;
1491     if (err == XMPPError::SeeOther && content) {
1492 	if (m_redirectCount < m_redirectMax) {
1493 	    int pos = content.rfind(':');
1494 	    if (pos >= 0) {
1495 		rAddr = content.substr(0,pos);
1496 		if (rAddr) {
1497 		    rPort = content.substr(pos + 1).toInteger(0);
1498 		    if (rPort < 0)
1499 			rPort = 0;
1500 		}
1501 	    }
1502 	    else
1503 		rAddr = content;
1504 	    if (rAddr) {
1505 		// Check if the connect destination is different
1506 		SocketAddr remoteIp;
1507 		remoteAddr(remoteIp);
1508 		bool sameDest = (rAddr == serverHost()) || (rAddr == m_connectAddr) || (rAddr == remoteIp.host());
1509 		if (sameDest) {
1510 		    sameDest = ((rPort > 0 ? rPort : XMPP_C2S_PORT) == remoteIp.port());
1511 		    if (sameDest) {
1512 			Debug(this,DebugNote,"Ignoring redirect to same destination [%p]",this);
1513 			rAddr = "";
1514 		    }
1515 		}
1516 	    }
1517 	}
1518     }
1519     terminate(1,false,xml,err,text,false,rAddr.null());
1520     setRedirect(rAddr,rPort);
1521     if (rAddr) {
1522 	resetFlags(InError);
1523 	resetConnectStatus();
1524 	changeState(Connecting);
1525 	m_engine->connectStream(this);
1526 	setRedirect();
1527     }
1528     return true;
1529 }
1530 
1531 // Check if a received element has valid 'from' and 'to' jid attributes
getJids(XmlElement * xml,JabberID & from,JabberID & to)1532 bool JBStream::getJids(XmlElement* xml, JabberID& from, JabberID& to)
1533 {
1534     if (!xml)
1535 	return true;
1536     from = xml->getAttribute("from");
1537     to = xml->getAttribute("to");
1538     XDebug(this,DebugAll,"Got jids xml='%s' from='%s' to='%s' [%p]",
1539 	xml->tag(),from.c_str(),to.c_str(),this);
1540     if (to.valid() && from.valid())
1541 	return true;
1542     Debug(this,DebugNote,"Received '%s' with bad from='%s' or to='%s' [%p]",
1543 	xml->tag(),from.c_str(),to.c_str(),this);
1544     terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1545     return false;
1546 }
1547 
1548 // Check if a received element is a presence, message or iq qualified by the stream
1549 //   namespace and the stream is not authenticated
1550 // Validate 'from' for c2s streams
1551 // Validate s2s 'to' domain and 'from' jid
checkStanzaRecv(XmlElement * xml,JabberID & from,JabberID & to)1552 bool JBStream::checkStanzaRecv(XmlElement* xml, JabberID& from, JabberID& to)
1553 {
1554     if (!XMPPUtils::isStanza(*xml))
1555 	return true;
1556 
1557     // RFC 3920bis 5.2: Accept stanzas only if the stream was authenticated
1558     // Accept IQs in jabber:iq:register namespace
1559     // Accept IQs in jabber:iq:auth namespace
1560     // They might be received on a non authenticated stream)
1561     if (!flag(StreamAuthenticated)) {
1562 	bool isIq = XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns);
1563 	bool valid = isIq && XMPPUtils::findFirstChild(*xml,XmlTag::Count,
1564 	    XMPPNamespace::IqRegister);
1565 	JBClientStream* c2s = clientStream();
1566 	if (!valid && c2s) {
1567 	    // Outgoing client stream: check register responses
1568 	    // Incoming client stream: check auth stanzas
1569 	    if (outgoing())
1570 		valid = c2s->isRegisterId(*xml);
1571 	    else
1572 		valid = isIq && XMPPUtils::findFirstChild(*xml,XmlTag::Count,
1573 		    XMPPNamespace::IqAuth);
1574 	}
1575 	if (!valid) {
1576 	    terminate(0,false,xml,XMPPError::NotAuthorized,
1577 		"Can't accept stanza on non authorized stream");
1578 	    return false;
1579 	}
1580     }
1581 
1582     switch (m_type) {
1583 	case c2s:
1584 	    if (m_incoming) {
1585 		// Check for valid from
1586 		if (from && !m_remote.match(from)) {
1587 		    XmlElement* e = XMPPUtils::createError(xml,
1588 			XMPPError::TypeModify,XMPPError::BadAddressing);
1589 		    sendStanza(e);
1590 		    return false;
1591 		}
1592 		// Make sure the upper layer always has the full jid
1593 		if (!from)
1594 		    from = m_remote;
1595 		else if (!from.resource())
1596 		    from.resource(m_remote.resource());
1597 	    }
1598 	    else {
1599 		XDebug(this,DebugStub,
1600 		    "Possible checkStanzaRecv() unhandled outgoing c2s stream [%p]",this);
1601 	    }
1602 	    break;
1603 	case comp:
1604 	case s2s:
1605 	    // RFC 3920bis 9.1.1.2 and 9.1.2.1:
1606 	    // Validate 'to' and 'from'
1607 	    // Accept anything for component streams
1608 	    if (!(to && from)) {
1609 		terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1610 		return false;
1611 	    }
1612 	    // TODO: Find an outgoing stream and send stanza error to the remote server
1613 	    //  instead of terminating the stream
1614 	    if (m_type == s2s) {
1615 		if (incoming()) {
1616 		    // Accept stanzas only for validated domains
1617 		    if (!serverStream()->hasRemoteDomain(from.domain())) {
1618 			terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1619 			return false;
1620 		    }
1621 		}
1622 		else {
1623 		    // We should not receive any stanza on outgoing s2s
1624 		    terminate(0,m_incoming,xml,XMPPError::NotAuthorized);
1625 		    return false;
1626 		}
1627 		if (m_local != to.domain()) {
1628 		    terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1629 		    return false;
1630 		}
1631 	    }
1632 	    else if (from.domain() != m_remote.domain()) {
1633 		terminate(0,m_incoming,xml,XMPPError::InvalidFrom);
1634 		return false;
1635 	    }
1636 	    break;
1637 	case cluster:
1638 	    break;
1639 	default:
1640 	    Debug(this,DebugStub,"checkStanzaRecv() unhandled stream type=%s [%p]",
1641 		typeName(),this);
1642     }
1643     return true;
1644 }
1645 
1646 // Change stream state. Reset state depending data
changeState(State newState,u_int64_t time)1647 void JBStream::changeState(State newState, u_int64_t time)
1648 {
1649     if (newState == m_state)
1650 	return;
1651     Debug(this,DebugAll,"Changing state from '%s' to '%s' [%p]",
1652 	stateName(),lookup(newState,s_stateName),this);
1653     // Set/reset state depending data
1654     switch (m_state) {
1655 	case Running:
1656 	    resetPing();
1657 	    m_pingId = "";
1658 	    break;
1659 	case WaitStart:
1660 	    // Reset connect status if not timeout
1661 	    if (m_startTimeout && m_startTimeout > time)
1662 		resetConnectStatus();
1663 	    m_startTimeout = 0;
1664 	    break;
1665 	case Securing:
1666 	    setFlags(StreamSecured);
1667 	    socketSetCanRead(true);
1668 	    break;
1669 	case Connecting:
1670 	    m_connectTimeout = 0;
1671 	    m_engine->stopConnect(toString());
1672 	    break;
1673 	case Register:
1674 	    if (type() == c2s)
1675 		clientStream()->m_registerReq = 0;
1676 	    break;
1677 	default: ;
1678     }
1679     switch (newState) {
1680 	case WaitStart:
1681 	    if (m_engine->m_setupTimeout && m_type != cluster)
1682 		m_setupTimeout = time + timerMultiplier(this) * m_engine->m_setupTimeout;
1683 	    else
1684 		m_setupTimeout = 0;
1685 	    m_startTimeout = time + timerMultiplier(this) * m_engine->m_startTimeout;
1686 	    DDebug(this,DebugAll,"Set timeouts start=" FMT64 " setup=" FMT64 " [%p]",
1687 		m_startTimeout,m_setupTimeout,this);
1688 	    if (m_xmlDom) {
1689 		Lock lck(m_socketMutex);
1690 		if (m_xmlDom) {
1691 		    m_xmlDom->reset();
1692 		    DDebug(this,DebugAll,"XML parser reset [%p]",this);
1693 		}
1694 	    }
1695 	    break;
1696 	case Idle:
1697 	    m_events.clear();
1698 	case Destroy:
1699 	    m_id = "";
1700 	    m_setupTimeout = 0;
1701 	    m_startTimeout = 0;
1702 	    // Reset all internal flags
1703 	    resetFlags(InternalFlags);
1704 	    if (type() == c2s)
1705 		clientStream()->m_registerReq = 0;
1706 	    break;
1707 	case Running:
1708 	    resetConnectStatus();
1709 	    setRedirect();
1710 	    m_redirectCount = 0;
1711 	    m_pingInterval = m_engine->m_pingInterval;
1712 	    setNextPing(true);
1713 	    setFlags(StreamSecured | StreamAuthenticated);
1714 	    resetFlags(InError);
1715 	    m_setupTimeout = 0;
1716 	    m_startTimeout = 0;
1717 	    if (m_state != Running)
1718 		m_events.append(new JBEvent(JBEvent::Running,this,0));
1719 	    break;
1720 	case Securing:
1721 	    socketSetCanRead(false);
1722 	    break;
1723 	default: ;
1724     }
1725     m_state = newState;
1726     if (m_state == Running)
1727 	setIdleTimer(time);
1728 }
1729 
1730 // Check if the stream compress flag is set and compression was offered by remote party
checkCompress()1731 XmlElement* JBStream::checkCompress()
1732 {
1733     if (flag(StreamCompressed) || !flag(Compress))
1734 	return 0;
1735     XMPPFeatureCompress* c = m_features.getCompress();
1736     if (!c)
1737 	return 0;
1738     if (!(c && c->methods()))
1739 	return 0;
1740     XmlElement* x = 0;
1741     Lock lock(m_socketMutex);
1742     m_engine->compressStream(this,c->methods());
1743     if (m_compress && m_compress->format()) {
1744 	x = XMPPUtils::createElement(XmlTag::Compress,XMPPNamespace::Compress);
1745 	x->addChild(XMPPUtils::createElement(XmlTag::Method,m_compress->format()));
1746     }
1747     else
1748 	TelEngine::destruct(m_compress);
1749     return x;
1750 }
1751 
1752 // Check for pending events. Set the last event
checkPendingEvent()1753 void JBStream::checkPendingEvent()
1754 {
1755     if (m_lastEvent)
1756 	return;
1757     if (!m_terminateEvent) {
1758 	GenObject* gen = m_events.remove(false);
1759 	if (gen)
1760 	    m_lastEvent = static_cast<JBEvent*>(gen);
1761 	return;
1762     }
1763     // Check for register events and raise them before the terminate event
1764     for (ObjList* o = m_events.skipNull(); o; o = o->skipNext()) {
1765 	JBEvent* ev = static_cast<JBEvent*>(o->get());
1766 	if (ev->type() == JBEvent::RegisterOk || ev->type() == JBEvent::RegisterFailed) {
1767 	    m_lastEvent = ev;
1768 	    m_events.remove(ev,false);
1769 	    return;
1770 	}
1771     }
1772     m_lastEvent = m_terminateEvent;
1773     m_terminateEvent = 0;
1774 }
1775 
1776 // Send pending stream XML or stanzas
sendPending(bool streamOnly)1777 bool JBStream::sendPending(bool streamOnly)
1778 {
1779     if (!m_socket)
1780 	return false;
1781     XDebug(this,DebugAll,"JBStream::sendPending() [%p]",this);
1782     bool noComp = !flag(StreamCompressed);
1783     // Always try to send pending stream XML first
1784     if (m_outStreamXml) {
1785 	const void* buf = 0;
1786 	unsigned int len = 0;
1787 	if (noComp) {
1788 	    buf = m_outStreamXml.c_str();
1789 	    len = m_outStreamXml.length();
1790 	}
1791 	else {
1792 	    buf = m_outStreamXmlCompress.data();
1793 	    len = m_outStreamXmlCompress.length();
1794 	}
1795 	if (!writeSocket(buf,len))
1796 	    return false;
1797 	bool all = false;
1798 	if (noComp) {
1799 	    all = (len == m_outStreamXml.length());
1800 	    if (all)
1801 		m_outStreamXml.clear();
1802 	    else
1803 		m_outStreamXml = m_outStreamXml.substr(len);
1804 	}
1805 	else {
1806 	    all = (len == m_outStreamXmlCompress.length());
1807 	    if (all) {
1808 		m_outStreamXml.clear();
1809 		m_outStreamXmlCompress.clear();
1810 	    }
1811 	    else
1812 		m_outStreamXmlCompress.cut(-(int)len);
1813 	}
1814 	// Start TLS now for incoming streams
1815 	if (m_incoming && m_state == Securing) {
1816 	    if (all) {
1817 		m_engine->encryptStream(this);
1818 		setFlags(StreamTls);
1819 		socketSetCanRead(true);
1820 	    }
1821 	    return true;
1822 	}
1823 	// Check set StreamCompressed flag if all data sent
1824 	if (all && flag(SetCompressed))
1825 	    setFlags(StreamCompressed);
1826 	if (streamOnly || !all)
1827 	    return true;
1828     }
1829 
1830     // Send pending stanzas
1831     if (m_state != Running || streamOnly)
1832 	return true;
1833     ObjList* obj = m_pending.skipNull();
1834     if (!obj)
1835 	return true;
1836     XmlElementOut* eout = static_cast<XmlElementOut*>(obj->get());
1837     XmlElement* xml = eout->element();
1838     if (!xml) {
1839 	m_pending.remove(eout,true);
1840 	return true;
1841     }
1842     bool sent = eout->sent();
1843     const void* buf = 0;
1844     unsigned int len = 0;
1845     if (noComp)
1846 	buf = (const void*)eout->getData(len);
1847     else {
1848 	if (!sent) {
1849 	    // Make sure the buffer is prepared for sending
1850 	    eout->getData(len);
1851 	    m_outXmlCompress.clear();
1852 	    if (!compress(eout))
1853 		return false;
1854 	}
1855 	buf = m_outXmlCompress.data();
1856 	len = m_outXmlCompress.length();
1857     }
1858     // Print the element only if it's the first time we try to send it
1859     if (!sent)
1860 	m_engine->printXml(this,true,*xml);
1861     if (writeSocket(buf,len)) {
1862 	if (!len)
1863 	    return true;
1864 	setIdleTimer();
1865 	// Adjust element's buffer. Remove it from list on completion
1866 	unsigned int rest = 0;
1867 	if (noComp) {
1868 	    eout->dataSent(len);
1869 	    rest = eout->dataCount();
1870 	}
1871 	else {
1872 	    m_outXmlCompress.cut(-(int)len);
1873 	    rest = m_outXmlCompress.length();
1874 	}
1875 	if (!rest) {
1876 	    DDebug(this,DebugAll,"Sent element (%p,%s) [%p]",xml,xml->tag(),this);
1877 	    m_pending.remove(eout,true);
1878 	}
1879 	else
1880 	    DDebug(this,DebugAll,"Partially sent element (%p,%s) sent=%u rest=%u [%p]",
1881 		xml,xml->tag(),len,rest,this);
1882 	return true;
1883     }
1884     // Error
1885     Debug(this,DebugNote,"Failed to send (%p,%s) [%p]",xml,xml->tag(),this);
1886     return false;
1887 }
1888 
1889 // Write data to socket
writeSocket(const void * data,unsigned int & len)1890 bool JBStream::writeSocket(const void* data, unsigned int& len)
1891 {
1892     if (!(data && len)) {
1893 	len = 0;
1894 	return true;
1895     }
1896     Lock lock(m_socketMutex);
1897     if (!socketCanWrite()) {
1898 	len = 0;
1899 	if (0 != (m_socketFlags & SocketCanWrite)) {
1900 	    socketSetCanWrite(false);
1901 	    postponeTerminate(0,m_incoming,XMPPError::SocketError,"No socket");
1902 	}
1903 	return false;
1904     }
1905     socketSetWriting(true);
1906     lock.drop();
1907 #ifdef JBSTREAM_DEBUG_SOCKET
1908     if (!flag(StreamCompressed))
1909 	Debug(this,DebugInfo,"Sending %s [%p]",(const char*)data,this);
1910     else
1911 	Debug(this,DebugInfo,"Sending %u compressed bytes [%p]",len,this);
1912 #endif
1913     int w = m_socket->writeData(data,len);
1914     if (w != Socket::socketError())
1915 	len = w;
1916     else
1917 	len = 0;
1918 #ifdef JBSTREAM_DEBUG_SOCKET
1919     if (!flag(StreamCompressed)) {
1920 	String sent((const char*)data,len);
1921 	Debug(this,DebugInfo,"Sent %s [%p]",sent.c_str(),this);
1922     }
1923     else
1924 	Debug(this,DebugInfo,"Sent %u compressed bytes [%p]",len,this);
1925 #endif
1926     Lock lck(m_socketMutex);
1927     // Check if the connection is waiting to be reset
1928     if (socketWaitReset()) {
1929 	socketSetWriting(false);
1930 	return true;
1931     }
1932     // Check if something changed
1933     if (!(m_socket && socketWriting())) {
1934 	Debug(this,DebugAll,"Socket deleted while writing [%p]",this);
1935 	return true;
1936     }
1937     socketSetWriting(false);
1938     if (w != Socket::socketError() || m_socket->canRetry())
1939 	return true;
1940     socketSetCanWrite(false);
1941     String tmp;
1942     Thread::errorString(tmp,m_socket->error());
1943     String reason;
1944     reason << "Socket send error: " << tmp << " (" << m_socket->error() << ")";
1945     Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
1946     lck.drop();
1947     postponeTerminate(0,m_incoming,XMPPError::SocketError,reason);
1948     return false;
1949 }
1950 
1951 // Update stream flags and remote connection data from engine
updateFromRemoteDef()1952 void JBStream::updateFromRemoteDef()
1953 {
1954     m_engine->lock();
1955     JBRemoteDomainDef* domain = m_engine->remoteDomainDef(m_remote.domain());
1956     // Update flags
1957     setFlags(domain->m_flags & StreamFlags);
1958     // Update connection data
1959     if (outgoing() && state() == Idle) {
1960 	m_connectAddr = domain->m_address;
1961 	m_connectPort = domain->m_port;
1962     }
1963     m_engine->unlock();
1964 }
1965 
1966 // Retrieve the first required feature in the list
firstRequiredFeature()1967 XMPPFeature* JBStream::firstRequiredFeature()
1968 {
1969     for (ObjList* o = m_features.skipNull(); o; o = o->skipNext()) {
1970 	XMPPFeature* f = static_cast<XMPPFeature*>(o->get());
1971 	if (f->required())
1972 	    return f;
1973     }
1974     return 0;
1975 }
1976 
1977 // Drop (delete) received XML element
dropXml(XmlElement * & xml,const char * reason)1978 bool JBStream::dropXml(XmlElement*& xml, const char* reason)
1979 {
1980     if (!xml)
1981 	return true;
1982     Debug(this,DebugNote,"Dropping xml=(%p,%s) ns=%s in state=%s reason='%s' [%p]",
1983 	xml,xml->tag(),TelEngine::c_safe(xml->xmlns()),stateName(),reason,this);
1984     TelEngine::destruct(xml);
1985     return true;
1986 }
1987 
1988 // Set stream flag mask
setFlags(int mask)1989 void JBStream::setFlags(int mask)
1990 {
1991 #ifdef XDEBUG
1992     String f;
1993     XMPPUtils::buildFlags(f,mask,s_flagName);
1994     Debug(this,DebugAll,"Setting flags 0x%X (%s) current=0x%X [%p]",
1995 	mask,f.c_str(),m_flags,this);
1996 #endif
1997     m_flags |= mask;
1998 #ifdef DEBUG
1999     if (0 != (mask & StreamCompressed))
2000 	Debug(this,DebugAll,"Stream is using compression [%p]",this);
2001 #endif
2002 }
2003 
2004 // Reset stream flag mask
resetFlags(int mask)2005 void JBStream::resetFlags(int mask)
2006 {
2007 #ifdef XDEBUG
2008     String f;
2009     XMPPUtils::buildFlags(f,mask,s_flagName);
2010     Debug(this,DebugAll,"Resetting flags 0x%X (%s) current=0x%X [%p]",
2011 	mask,f.c_str(),m_flags,this);
2012 #endif
2013     m_flags &= ~mask;
2014 }
2015 
2016 // Set the idle timer in Running state
setIdleTimer(u_int64_t msecNow)2017 void JBStream::setIdleTimer(u_int64_t msecNow)
2018 {
2019     // Set only for non c2s in Running state
2020     if (m_type == c2s || m_type == cluster || m_state != Running ||
2021 	!m_engine->m_idleTimeout)
2022 	return;
2023     m_idleTimeout = msecNow + m_engine->m_idleTimeout;
2024     XDebug(this,DebugAll,"Idle timeout set to " FMT64 "ms [%p]",m_idleTimeout,this);
2025 }
2026 
2027 // Reset ping data
resetPing()2028 void JBStream::resetPing()
2029 {
2030     if (!(m_pingTimeout || m_nextPing))
2031 	return;
2032     XDebug(this,DebugAll,"Reset ping data [%p]",this);
2033     m_nextPing = 0;
2034     m_pingTimeout = 0;
2035 }
2036 
2037 // Set the time of the next ping if there is any timeout and we don't have a ping in progress
2038 // @return XmlElement containing the ping to send, 0 if no ping is going to be sent or 'force' is true
setNextPing(bool force)2039 XmlElement* JBStream::setNextPing(bool force)
2040 {
2041     if (!m_pingInterval) {
2042 	resetPing();
2043 	return 0;
2044     }
2045     if (m_type != c2s && m_type != comp)
2046 	return 0;
2047     if (force) {
2048 	m_nextPing = Time::msecNow() + m_pingInterval;
2049 	m_pingTimeout = 0;
2050 	XDebug(this,DebugAll,"Next ping " FMT64U " [%p]",m_nextPing,this);
2051 	return 0;
2052     }
2053     XmlElement* ping = 0;
2054     if (m_nextPing) {
2055 	// Ping still active in engine ?
2056 	Time time;
2057 	if (m_nextPing > time.msec())
2058 	    return 0;
2059 	if (m_engine->m_pingTimeout) {
2060 	    generateIdIndex(m_pingId,"_ping_");
2061 	    ping = buildPing(m_pingId);
2062 	    if (ping)
2063 		m_pingTimeout = time.msec() + m_engine->m_pingTimeout;
2064 	    else
2065 		m_pingTimeout = 0;
2066 	}
2067 	else
2068 	    resetPing();
2069     }
2070     if (m_pingInterval)
2071 	m_nextPing = Time::msecNow() + m_pingInterval;
2072     else
2073 	m_nextPing = 0;
2074     XDebug(this,DebugAll,"Next ping " FMT64U " ping=%p [%p]",m_nextPing,ping,this);
2075     return ping;
2076 }
2077 
2078 // Process incoming elements in Challenge state
2079 // Return false if stream termination was initiated
processChallenge(XmlElement * xml,const JabberID & from,const JabberID & to)2080 bool JBStream::processChallenge(XmlElement* xml, const JabberID& from, const JabberID& to)
2081 {
2082     int t, n;
2083     if (!XMPPUtils::getTag(*xml,t,n))
2084 	return dropXml(xml,"failed to retrieve element tag");
2085     if (n != XMPPNamespace::Sasl)
2086 	return dropXml(xml,"expecting sasl namespace");
2087     if (t == XmlTag::Abort) {
2088 	TelEngine::destruct(xml);
2089 	TelEngine::destruct(m_sasl);
2090 	XmlElement* rsp = XMPPUtils::createFailure(XMPPNamespace::Sasl,XMPPError::Aborted);
2091 	sendStreamXml(Features,rsp);
2092 	return true;
2093     }
2094     if (t != XmlTag::Response) {
2095 	dropXml(xml,"expecting sasl response");
2096 	return true;
2097     }
2098     XMPPError::Type error = XMPPError::NoError;
2099     // Use a while() to set error and break to the end
2100     while (true) {
2101 	// Decode non empty auth data
2102 	const String& text = xml->getText();
2103 	if (text) {
2104 	    String tmp;
2105 	    if (!decodeBase64(tmp,text,this)) {
2106 		error = XMPPError::IncorrectEnc;
2107 		break;
2108 	    }
2109 	    if (m_sasl && !m_sasl->parseMD5ChallengeRsp(tmp)) {
2110 		error = XMPPError::MalformedRequest;
2111 		break;
2112 	    }
2113 	}
2114 	else if (m_sasl)
2115 	    TelEngine::destruct(m_sasl->m_params);
2116 	break;
2117     }
2118     if (error == XMPPError::NoError) {
2119 	changeState(Auth);
2120 	m_events.append(new JBEvent(JBEvent::Auth,this,xml,from,to));
2121     }
2122     else {
2123 	Debug(this,DebugNote,"Received bad challenge response error='%s' [%p]",
2124 	    XMPPUtils::s_error[error].c_str(),this);
2125 	XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
2126 	sendStreamXml(Features,failure);
2127 	TelEngine::destruct(xml);
2128     }
2129     return true;
2130 }
2131 
2132 // Process incoming 'auth' elements qualified by SASL namespace
2133 // Return false if stream termination was initiated
processSaslAuth(XmlElement * xml,const JabberID & from,const JabberID & to)2134 bool JBStream::processSaslAuth(XmlElement* xml, const JabberID& from, const JabberID& to)
2135 {
2136     if (!xml)
2137 	return true;
2138     if (!XMPPUtils::isTag(*xml,XmlTag::Auth,XMPPNamespace::Sasl))
2139 	return dropXml(xml,"expecting 'auth' in sasl namespace");
2140     XMPPFeatureSasl* sasl = m_features.getSasl();
2141     TelEngine::destruct(m_sasl);
2142     XMPPError::Type error = XMPPError::NoError;
2143     const char* mName = xml->attribute("mechanism");
2144     int mech = XMPPUtils::authMeth(mName);
2145     // Use a while() to set error and break to the end
2146     while (true) {
2147 	if (!sasl->mechanism(mech)) {
2148 	    error = XMPPError::InvalidMechanism;
2149 	    break;
2150 	}
2151 	if (mech == XMPPUtils::AuthMD5) {
2152 	    // Ignore auth text: we will challenge the client
2153 	    m_sasl = new SASL(false,m_local.domain());
2154 	    String buf;
2155 	    if (m_sasl->buildMD5Challenge(buf)) {
2156 		XDebug(this,DebugAll,"Sending challenge=%s [%p]",buf.c_str(),this);
2157 		Base64 b((void*)buf.c_str(),buf.length());
2158 		b.encode(buf);
2159 		XmlElement* chg = XMPPUtils::createElement(XmlTag::Challenge,
2160 		    XMPPNamespace::Sasl,buf);
2161 		if (!sendStreamXml(Challenge,chg)) {
2162 		    TelEngine::destruct(xml);
2163 		    return false;
2164 		}
2165 	    }
2166 	    else {
2167 		TelEngine::destruct(m_sasl);
2168 		error = XMPPError::TempAuthFailure;
2169 		break;
2170 	    }
2171 	}
2172 	else if (mech == XMPPUtils::AuthPlain) {
2173 	    // Decode non empty auth data
2174 	    DataBlock d;
2175 	    const String& text = xml->getText();
2176 	    if (text && text != "=" && !decodeBase64(d,text)) {
2177 		error = XMPPError::IncorrectEnc;
2178 		break;
2179 	    }
2180 	    m_sasl = new SASL(true);
2181 	    if (!m_sasl->parsePlain(d)) {
2182 		error = XMPPError::MalformedRequest;
2183 		break;
2184 	    }
2185 	}
2186 	else {
2187 	    // This should never happen: we don't handle a mechanism sent
2188 	    // to the remote party!
2189 	    Debug(this,DebugWarn,"Unhandled advertised auth mechanism='%s' [%p]",
2190 		mName,this);
2191 	    error = XMPPError::TempAuthFailure;
2192 	    break;
2193 	}
2194 	break;
2195     }
2196     if (error == XMPPError::NoError) {
2197 	// Challenge state: we've challenged the remote party
2198 	// Otherwise: request auth from upper layer
2199 	if (state() == Challenge)
2200 	    TelEngine::destruct(xml);
2201 	else {
2202 	    changeState(Auth);
2203 	    m_events.append(new JBEvent(JBEvent::Auth,this,xml,from,to));
2204 	}
2205     }
2206     else {
2207 	Debug(this,DebugNote,"Received auth request mechanism='%s' error='%s' [%p]",
2208 	    mName,XMPPUtils::s_error[error].c_str(),this);
2209 	XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
2210 	sendStreamXml(m_state,failure);
2211 	TelEngine::destruct(xml);
2212     }
2213     return true;
2214 }
2215 
2216 // Process received elements in Features state (incoming stream)
2217 // Return false if stream termination was initiated
processFeaturesIn(XmlElement * xml,const JabberID & from,const JabberID & to)2218 bool JBStream::processFeaturesIn(XmlElement* xml, const JabberID& from, const JabberID& to)
2219 {
2220     if (!xml)
2221 	return true;
2222     const String* t = 0;
2223     const String* nsName = 0;
2224     if (!xml->getTag(t,nsName))
2225 	return dropXml(xml,"invalid tag namespace prefix");
2226     int ns = nsName ? XMPPUtils::s_ns[*nsName] : XMPPNamespace::Count;
2227 
2228     // Component: Waiting for handshake in the stream namespace
2229     if (type() == comp) {
2230 	if (outgoing())
2231 	    return dropXml(xml,"invalid state for incoming stream");
2232 	if (*t != XMPPUtils::s_tag[XmlTag::Handshake] || ns != m_xmlns)
2233 	    return dropXml(xml,"expecting handshake in stream's namespace");
2234 	JBEvent* ev = new JBEvent(JBEvent::Auth,this,xml,from,to);
2235 	ev->m_text = xml->getText();
2236 	m_events.append(ev);
2237 	changeState(Auth);
2238 	return true;
2239     }
2240 
2241     XMPPFeature* f = 0;
2242     // Stream compression feature and compression namespace are not the same!
2243     if (ns != XMPPNamespace::Compress)
2244 	f = m_features.get(ns);
2245     else
2246 	f = m_features.get(XMPPNamespace::CompressFeature);
2247 
2248     // Check if received unexpected feature
2249     if (!f) {
2250 	// Check for some features that can be negotiated via 'iq' elements
2251 	if (m_type == c2s && *t == XMPPUtils::s_tag[XmlTag::Iq] && ns == m_xmlns) {
2252 	    int chTag = XmlTag::Count;
2253 	    int chNs = XMPPNamespace::Count;
2254 	    XmlElement* child = xml->findFirstChild();
2255 	    if (child)
2256 		XMPPUtils::getTag(*child,chTag,chNs);
2257 	    // Bind
2258 	    if (chNs == XMPPNamespace::Bind && m_features.get(XMPPNamespace::Bind)) {
2259 		// We've sent bind feature
2260 		// Don't accept it if not authenticated and TLS/SASL must be negotiated
2261 		if (!flag(StreamAuthenticated)) {
2262 		    XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2263 		    if (tls && tls->required()) {
2264 			XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeWait,
2265 			    XMPPError::EncryptionRequired);
2266 			sendStreamXml(m_state,e);
2267 			return true;
2268 		    }
2269 		    XMPPFeature* sasl = m_features.get(XMPPNamespace::Sasl);
2270 		    XMPPFeature* iqAuth = m_features.get(XMPPNamespace::IqAuth);
2271 		    if ((sasl && sasl->required()) || (iqAuth && iqAuth->required())) {
2272 			XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeAuth,
2273 			    XMPPError::NotAllowed);
2274 			sendStreamXml(m_state,e);
2275 			return true;
2276 		    }
2277 		}
2278 		// Remove TLS/SASL features from list: they can't be negotiated anymore
2279 		setFlags(StreamSecured | StreamAuthenticated);
2280 		m_features.remove(XMPPNamespace::Tls);
2281 		m_features.remove(XMPPNamespace::Sasl);
2282 		m_features.remove(XMPPNamespace::IqAuth);
2283 		changeState(Running);
2284 		return processRunning(xml,from,to);
2285 	    }
2286 	    else if (chNs == XMPPNamespace::IqRegister) {
2287 		// Register
2288 		m_events.append(new JBEvent(JBEvent::Iq,this,xml,xml->findFirstChild()));
2289 		return true;
2290 	    }
2291 	    else if (chNs == XMPPNamespace::IqAuth) {
2292 		XMPPUtils::IqType type = XMPPUtils::iqType(xml->attribute("type"));
2293 		bool req = type == XMPPUtils::IqGet || type == XMPPUtils::IqSet;
2294 		// Stream non SASL auth
2295 		// Check if we support it
2296 		if (!m_features.get(XMPPNamespace::IqAuth)) {
2297 		    if (req) {
2298 			XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeCancel,
2299 			    XMPPError::NotAllowed);
2300 			return sendStreamXml(m_state,e);
2301 		    }
2302 		    return dropXml(xml,"unexpected jabber:iq:auth element");
2303 		}
2304 		if (flag(StreamRemoteVer1)) {
2305 		    XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2306 		    if (tls && tls->required()) {
2307 			XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeWait,
2308 			    XMPPError::EncryptionRequired);
2309 			sendStreamXml(m_state,e);
2310 			return true;
2311 		    }
2312 		}
2313 		if (chTag != XmlTag::Query) {
2314 		    if (req) {
2315 			XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeModify,
2316 			    XMPPError::FeatureNotImpl);
2317 			sendStreamXml(m_state,e);
2318 			return true;
2319 		    }
2320 		    return dropXml(xml,"expecting iq auth with 'query' child");
2321 		}
2322 		// Send it to the uppper layer
2323 		if (type == XMPPUtils::IqSet) {
2324 		    m_events.append(new JBEvent(JBEvent::Auth,this,xml,xml->findFirstChild()));
2325 		    changeState(Auth);
2326 		}
2327 		else
2328 		    m_events.append(new JBEvent(JBEvent::Iq,this,xml,xml->findFirstChild()));
2329 		return true;
2330 	    }
2331 	}
2332 	// s2s waiting for dialback
2333 	if (m_type == s2s) {
2334 	    if (isDbResult(*xml))
2335 		return serverStream()->processDbResult(xml,from,to);
2336 	    // Drop the element if not authenticated
2337 	    if (!flag(StreamAuthenticated))
2338 		return dropXml(xml,"expecting dialback result");
2339 	}
2340 	// Check if all remaining features are optional
2341 	XMPPFeature* req = firstRequiredFeature();
2342 	if (req) {
2343 	    Debug(this,DebugInfo,
2344 		"Received '%s' while having '%s' required feature not negotiated [%p]",
2345 		xml->tag(),req->c_str(),this);
2346 	    // TODO: terminate the stream?
2347 	    return dropXml(xml,"required feature negotiation not completed");
2348 	}
2349 	// No more required features: change state to Running
2350 	// Remove TLS/SASL features from list: they can't be negotiated anymore
2351 	setFlags(StreamSecured | StreamAuthenticated);
2352 	m_features.remove(XMPPNamespace::Tls);
2353 	m_features.remove(XMPPNamespace::Sasl);
2354 	changeState(Running);
2355 	return processRunning(xml,from,to);
2356     }
2357     // Stream enchryption
2358     if (ns == XMPPNamespace::Tls) {
2359 	if (*t != XMPPUtils::s_tag[XmlTag::Starttls])
2360 	    return dropXml(xml,"expecting tls 'starttls' element");
2361 	if (!flag(StreamSecured)) {
2362 	    // Change state before trying to send the element
2363 	    // to signal to sendPending() to enchrypt the stream after sending it
2364 	    changeState(Securing);
2365 	    sendStreamXml(WaitStart,
2366 		XMPPUtils::createElement(XmlTag::Proceed,XMPPNamespace::Tls));
2367 	}
2368 	else {
2369 	    Debug(this,DebugNote,"Received '%s' element while already secured [%p]",
2370 		xml->tag(),this);
2371 	    // We shouldn't have Starttls in features list
2372 	    // Something went wrong: terminate the stream
2373 	    terminate(0,true,xml,XMPPError::Internal,"Stream already secured");
2374 	    return false;
2375 	}
2376 	TelEngine::destruct(xml);
2377 	return true;
2378     }
2379     // Stream SASL auth
2380     if (ns == XMPPNamespace::Sasl) {
2381 	if (*t != XMPPUtils::s_tag[XmlTag::Auth])
2382 	    return dropXml(xml,"expecting sasl 'auth' element");
2383 	if (!flag(StreamAuthenticated)) {
2384 	    // Check if we must negotiate TLS before authentication
2385 	    XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2386 	    if (tls) {
2387 		if (!flag(StreamSecured) && tls->required()) {
2388 		    TelEngine::destruct(xml);
2389 		    XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,
2390 			XMPPError::EncryptionRequired);
2391 		    sendStreamXml(m_state,failure);
2392 		    return true;
2393 		}
2394 		setSecured();
2395 	    }
2396 	}
2397 	else {
2398 	    // Remote party requested authentication while already done:
2399 	    // Reset our flag and let it authenticate again
2400 	    Debug(this,DebugNote,
2401 		"Received auth request while already authenticated [%p]",
2402 		this);
2403 	    resetFlags(StreamAuthenticated);
2404 	}
2405 	return processSaslAuth(xml,from,to);
2406     }
2407     // Stream compression
2408     if (ns == XMPPNamespace::Compress) {
2409 	if (*t != XMPPUtils::s_tag[XmlTag::Compress])
2410 	    return dropXml(xml,"expecting stream compression 'compress' element");
2411 	return handleCompressReq(xml);
2412     }
2413     return dropXml(xml,"unhandled stream feature");
2414 }
2415 
2416 // Process received elements in Features state (outgoing stream)
2417 // Return false if stream termination was initiated
processFeaturesOut(XmlElement * xml,const JabberID & from,const JabberID & to)2418 bool JBStream::processFeaturesOut(XmlElement* xml, const JabberID& from,
2419     const JabberID& to)
2420 {
2421     if (!xml)
2422 	return true;
2423     if (!XMPPUtils::isTag(*xml,XmlTag::Features,XMPPNamespace::Stream))
2424 	return dropXml(xml,"expecting stream features");
2425     m_features.fromStreamFeatures(*xml);
2426     // Check TLS
2427     if (!flag(StreamSecured)) {
2428 	XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2429 	if (tls) {
2430 	    if (m_engine->hasClientTls()) {
2431 		TelEngine::destruct(xml);
2432 		XmlElement* x = XMPPUtils::createElement(XmlTag::Starttls,
2433 		    XMPPNamespace::Tls);
2434 		return sendStreamXml(WaitTlsRsp,x);
2435 	    }
2436 	    if (tls->required() || flag(TlsRequired))
2437 		return destroyDropXml(xml,XMPPError::Internal,
2438 		    "required encryption not available");
2439 	}
2440 	else if (flag(TlsRequired))
2441 	    return destroyDropXml(xml,XMPPError::EncryptionRequired,
2442 		"required encryption not supported by remote");
2443 	setFlags(StreamSecured);
2444     }
2445     // Check auth
2446     if (!flag(StreamAuthenticated)) {
2447 	JBServerStream* server = serverStream();
2448 	if (server) {
2449 	    TelEngine::destruct(xml);
2450 	    return server->sendDialback();
2451 	}
2452 	JBClientStream* client = clientStream();
2453 	if (client) {
2454 	    // Start auth or request registration data
2455 	    TelEngine::destruct(xml);
2456 	    if (!flag(RegisterUser))
2457 		return client->startAuth();
2458 	    return client->requestRegister(false);
2459 	}
2460     }
2461     // Check compression
2462     XmlElement* x = checkCompress();
2463     if (x) {
2464 	TelEngine::destruct(xml);
2465 	return sendStreamXml(Compressing,x);
2466     }
2467     JBClientStream* client = clientStream();
2468     if (client) {
2469 	TelEngine::destruct(xml);
2470 	return client->bind();
2471     }
2472     JBServerStream* server = serverStream();
2473     JBClusterStream* cluster = clusterStream();
2474     if (server || cluster) {
2475 	TelEngine::destruct(xml);
2476 	changeState(Running);
2477 	return true;
2478     }
2479     return dropXml(xml,"incomplete features process for outgoing stream");
2480 }
2481 
2482 // Process received elements in WaitTlsRsp state (outgoing stream)
2483 // The element will be consumed
2484 // Return false if stream termination was initiated
processWaitTlsRsp(XmlElement * xml,const JabberID & from,const JabberID & to)2485 bool JBStream::processWaitTlsRsp(XmlElement* xml, const JabberID& from,
2486     const JabberID& to)
2487 {
2488     if (!xml)
2489 	return true;
2490     int t,n;
2491     const char* reason = 0;
2492     if (XMPPUtils::getTag(*xml,t,n)) {
2493 	if (n == XMPPNamespace::Tls) {
2494 	    // Accept proceed and failure
2495 	    if (t != XmlTag::Proceed && t != XmlTag::Failure)
2496 		reason = "expecting tls 'proceed' or 'failure'";
2497 	}
2498 	else
2499 	    reason = "expecting tls namespace";
2500     }
2501     else
2502 	reason = "failed to retrieve element tag";
2503     if (reason) {
2504 	// TODO: Unacceptable response to starttls request
2505 	// Restart socket read or terminate the stream ?
2506 	socketSetCanRead(true);
2507 	return dropXml(xml,reason);
2508     }
2509     if (t == XmlTag::Proceed) {
2510 	TelEngine::destruct(xml);
2511 	changeState(Securing);
2512 	m_engine->encryptStream(this);
2513 	socketSetCanRead(true);
2514 	setFlags(StreamTls);
2515 	XmlElement* s = buildStreamStart();
2516 	return sendStreamXml(WaitStart,s);
2517     }
2518     // TODO: Implement TLS usage reset if the stream is going to re-connect
2519     terminate(1,false,xml,XMPPError::NoError,"Server can't start TLS");
2520     return false;
2521 }
2522 
2523 // Set stream namespace from type
setXmlns()2524 void JBStream::setXmlns()
2525 {
2526     switch (m_type) {
2527 	case c2s:
2528 	    m_xmlns = XMPPNamespace::Client;
2529 	    break;
2530 	case s2s:
2531 	    m_xmlns = XMPPNamespace::Server;
2532 	    break;
2533 	case comp:
2534 	    m_xmlns = XMPPNamespace::ComponentAccept;
2535 	    break;
2536 	case cluster:
2537 	    m_xmlns = XMPPNamespace::YateCluster;
2538 	    break;
2539     }
2540 }
2541 
2542 // Event termination notification
eventTerminated(const JBEvent * ev)2543 void JBStream::eventTerminated(const JBEvent* ev)
2544 {
2545     if (ev && ev == m_lastEvent) {
2546 	m_lastEvent = 0;
2547 	XDebug(this,DebugAll,"Event (%p,%s) terminated [%p]",ev,ev->name(),this);
2548     }
2549 }
2550 
2551 // Compress data to be sent (the pending stream xml buffer or pending stanza)
2552 // Return false on failure
compress(XmlElementOut * xml)2553 bool JBStream::compress(XmlElementOut* xml)
2554 {
2555     DataBlock& buf = xml ? m_outXmlCompress : m_outStreamXmlCompress;
2556     const String& xmlBuf = xml ? xml->buffer() : m_outStreamXml;
2557     m_socketMutex.lock();
2558     int res = m_compress ? m_compress->compress(xmlBuf.c_str(),xmlBuf.length(),buf) : -1000;
2559     m_socketMutex.unlock();
2560     const char* s = xml ? "pending" : "stream";
2561     if (res >= 0) {
2562 	if ((unsigned int)res == xmlBuf.length()) {
2563 #ifdef JBSTREAM_DEBUG_COMPRESS
2564 	    Debug(this,DebugInfo,"Compressed %s xml %u --> %u [%p]",
2565 		s,xmlBuf.length(),buf.length(),this);
2566 #endif
2567 	    return true;
2568 	}
2569 	Debug(this,DebugNote,"Partially compressed %s xml %d/%u [%p]",
2570 	    s,res,xmlBuf.length(),this);
2571     }
2572     else
2573 	Debug(this,DebugNote,"Failed to compress %s xml: %d [%p]",s,res,this);
2574     return false;
2575 }
2576 
2577 // Reset connect status data
resetConnectStatus()2578 void JBStream::resetConnectStatus()
2579 {
2580     DDebug(this,DebugAll,"resetConnectStatus() [%p]",this);
2581     m_connectStatus = JBConnect::Start;
2582     m_connectSrvs.clear();
2583 }
2584 
2585 // Postpone stream terminate until all parsed elements are processed
2586 // Terminate now if allowed
postponeTerminate(int location,bool destroy,int error,const char * reason)2587 void JBStream::postponeTerminate(int location, bool destroy, int error, const char* reason)
2588 {
2589     lock();
2590     XDebug(this,DebugAll,"postponeTerminate(%d,%u,%s,%s) state=%s [%p]",
2591 	location,destroy,XMPPUtils::s_error[error].c_str(),reason,stateName(),this);
2592     if (!m_ppTerminate) {
2593 	int interval = 0;
2594 	if (type() == c2s)
2595 	    interval = m_engine->m_pptTimeoutC2s;
2596 	else
2597 	    interval = m_engine->m_pptTimeout;
2598 	if (interval && haveData()) {
2599 	    m_ppTerminate = new NamedList("");
2600 	    m_ppTerminate->addParam("location",String(location));
2601 	    m_ppTerminate->addParam("destroy",String::boolText(destroy));
2602 	    m_ppTerminate->addParam("error",String(error));
2603 	    m_ppTerminate->addParam("reason",reason);
2604 	    m_ppTerminateTimeout = Time::msecNow() + interval;
2605 	    Debug(this,DebugInfo,
2606 		"Postponed termination location=%d destroy=%u error=%s reason=%s interval=%us [%p]",
2607 		location,destroy,XMPPUtils::s_error[error].c_str(),reason,interval,this);
2608 	}
2609     }
2610     bool postponed = m_ppTerminate != 0;
2611     unlock();
2612     if (!postponed)
2613 	terminate(location,destroy,0,error,reason);
2614 }
2615 
2616 // Handle postponed termination. Return true if found
postponedTerminate()2617 bool JBStream::postponedTerminate()
2618 {
2619     if (!m_ppTerminate)
2620 	return false;
2621     int location = m_ppTerminate->getIntValue("location");
2622     bool destroy = m_ppTerminate->getBoolValue("destroy");
2623     int error = m_ppTerminate->getIntValue("error");
2624     String reason = m_ppTerminate->getValue("reason");
2625     resetPostponedTerminate();
2626     DDebug(this,DebugAll,"postponedTerminate(%d,%u,%s,%s) state=%s [%p]",
2627 	location,destroy,XMPPUtils::s_error[error].c_str(),reason.c_str(),
2628 	stateName(),this);
2629     terminate(location,destroy,0,error,reason);
2630     return true;
2631 }
2632 
2633 // Reset redirect data
setRedirect(const String & addr,int port)2634 void JBStream::setRedirect(const String& addr, int port)
2635 {
2636     if (!addr) {
2637 	if (m_redirectAddr)
2638 	    Debug(this,DebugInfo,"Cleared redirect data [%p]",this);
2639 	m_redirectAddr = "";
2640 	m_redirectPort = 0;
2641 	return;
2642     }
2643     if (m_redirectCount >= m_redirectMax) {
2644 	setRedirect();
2645 	return;
2646     }
2647     resetConnectStatus();
2648     m_redirectAddr = addr;
2649     m_redirectPort = port;
2650     m_redirectCount++;
2651     Debug(this,DebugInfo,"Set redirect to '%s:%d' in state %s (counter=%u max=%u) [%p]",
2652 	m_redirectAddr.c_str(),m_redirectPort,stateName(),m_redirectCount,m_redirectMax,this);
2653 }
2654 
2655 
2656 /*
2657  * JBClientStream
2658  */
JBClientStream(JBEngine * engine,Socket * socket,bool ssl)2659 JBClientStream::JBClientStream(JBEngine* engine, Socket* socket, bool ssl)
2660     : JBStream(engine,socket,c2s,ssl),
2661     m_userData(0), m_registerReq(0)
2662 {
2663 }
2664 
JBClientStream(JBEngine * engine,const JabberID & jid,const String & account,const NamedList & params,const char * name,const char * serverHost)2665 JBClientStream::JBClientStream(JBEngine* engine, const JabberID& jid, const String& account,
2666     const NamedList& params, const char* name, const char* serverHost)
2667     : JBStream(engine,c2s,jid,jid.domain(),TelEngine::null(name) ? account.c_str() : name,
2668 	&params,serverHost),
2669     m_account(account), m_userData(0), m_registerReq(0)
2670 {
2671     m_password = params.getValue("password");
2672 }
2673 
2674 // Build a ping iq stanza
buildPing(const String & stanzaId)2675 XmlElement* JBClientStream::buildPing(const String& stanzaId)
2676 {
2677     return XMPPUtils::createPing(stanzaId);
2678 }
2679 
2680 // Bind a resource to an incoming stream
bind(const String & resource,const char * id,XMPPError::Type error)2681 void JBClientStream::bind(const String& resource, const char* id, XMPPError::Type error)
2682 {
2683     DDebug(this,DebugAll,"bind(%s,'%s') [%p]",resource.c_str(),
2684 	XMPPUtils::s_error[error].c_str(),this);
2685     Lock lock(this);
2686     if (!incoming() || m_remote.resource())
2687 	return;
2688     XmlElement* xml = 0;
2689     if (resource) {
2690 	m_remote.resource(resource);
2691 	xml = XMPPUtils::createIq(XMPPUtils::IqResult,0,0,id);
2692 	XmlElement* bind = XMPPUtils::createElement(XmlTag::Bind,
2693 	    XMPPNamespace::Bind);
2694 	bind->addChild(XMPPUtils::createElement(XmlTag::Jid,m_remote));
2695 	xml->addChild(bind);
2696     }
2697     else {
2698 	if (error == XMPPError::NoError)
2699 	    error = XMPPError::NotAllowed;
2700 	xml = XMPPUtils::createError(XMPPError::TypeModify,error);
2701     }
2702     // Remove non-negotiable bind feature on success
2703     if (sendStanza(xml) && resource)
2704 	m_features.remove(XMPPNamespace::Bind);
2705 }
2706 
2707 // Request account setup (or info) on outgoing stream
requestRegister(bool data,bool set,const String & newPass)2708 bool JBClientStream::requestRegister(bool data, bool set, const String& newPass)
2709 {
2710     if (incoming())
2711 	return true;
2712 
2713     Lock lock(this);
2714     DDebug(this,DebugAll,"requestRegister(%u,%u) [%p]",data,set,this);
2715     XmlElement* req = 0;
2716     if (data) {
2717 	// Register new user, change the account or remove it
2718 	if (set) {
2719 	    // TODO: Allow user account register/change through unsecured streams ?
2720 	    String* pass = 0;
2721 	    if (!flag(StreamAuthenticated))
2722 		pass = &m_password;
2723 	    else if (newPass) {
2724 		m_newPassword = newPass;
2725 		pass = &m_newPassword;
2726 	    }
2727 	    if (!pass)
2728 		return false;
2729 	    m_registerReq = '2';
2730 	    req = XMPPUtils::createRegisterQuery(0,0,String(m_registerReq),
2731 		m_local.node(),*pass);
2732 	}
2733 	else if (flag(StreamAuthenticated)) {
2734 	    m_registerReq = '3';
2735 	    req = XMPPUtils::createRegisterQuery(XMPPUtils::IqSet,0,0,
2736 		String(m_registerReq),XMPPUtils::createElement(XmlTag::Remove));
2737 	}
2738 	else
2739 	    return false;
2740     }
2741     else {
2742 	// Request register info
2743 	m_registerReq = '1';
2744 	req = XMPPUtils::createRegisterQuery(XMPPUtils::IqGet,0,0,String(m_registerReq));
2745     }
2746     if (!flag(StreamAuthenticated) || state() != Running)
2747 	return sendStreamXml(Register,req);
2748     return sendStanza(req);
2749 }
2750 
2751 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)2752 bool JBClientStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
2753 {
2754     if (!xml)
2755 	return true;
2756     // Check if a resource was bound to an incoming stream
2757     // Accept only 'iq' with bind namespace only if we've sent 'bind' feature
2758     if (incoming()) {
2759 	if (!m_remote.resource()) {
2760 	    if (XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns)) {
2761 		XmlElement* child = XMPPUtils::findFirstChild(*xml,XmlTag::Bind,XMPPNamespace::Bind);
2762 		if (child && m_features.get(XMPPNamespace::Bind)) {
2763 		    m_events.append(new JBEvent(JBEvent::Bind,this,xml,from,to,child));
2764 		    return true;
2765 		}
2766 	    }
2767 	    XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeCancel,
2768 		XMPPError::NotAllowed,"No resource bound to the stream");
2769 	    sendStanza(e);
2770 	    return true;
2771 	}
2772     }
2773     else if (m_registerReq && XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns) &&
2774 	isRegisterId(*xml) && XMPPUtils::isResponse(*xml))
2775 	return processRegister(xml,from,to);
2776     return JBStream::processRunning(xml,from,to);
2777 }
2778 
2779 // Process received elements in WaitStart state
2780 // WaitStart: Incoming: waiting for stream start
2781 //            Outgoing: idem (our stream start was already sent)
2782 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)2783 bool JBClientStream::processStart(const XmlElement* xml, const JabberID& from,
2784     const JabberID& to)
2785 {
2786     XDebug(this,DebugAll,"JBClientStream::processStart(%s) [%p]",xml->tag(),this);
2787 
2788     // Check element
2789     if (!processStreamStart(xml))
2790 	return false;
2791 
2792     // RFC3920 5.3.1:
2793     // The 'from' attribute must be set for response stream start
2794     if (outgoing()) {
2795 	if (from.null()) {
2796 	    Debug(this,DebugNote,"Received '%s' with empty 'from' [%p]",xml->tag(),this);
2797 	    terminate(0,false,0,XMPPError::BadAddressing,"Missing 'from' attribute");
2798 	    return false;
2799 	}
2800     }
2801     else {
2802 	if (!flag(StreamAuthenticated)) {
2803 	    m_remote.set(from);
2804 	    m_local.set(to);
2805         }
2806     }
2807     m_remote.resource("");
2808     // RFC3920 5.3.1: The 'to' attribute must always be set
2809     // RFC3920: The 'to' attribute is optional
2810     bool validTo = !to.null();
2811     if (validTo) {
2812 	if (outgoing())
2813 	    validTo = (m_local.bare() == to);
2814 	else
2815 	    validTo = engine()->hasDomain(to.domain());
2816     }
2817 #ifdef RFC3920
2818     else
2819 	validTo = outgoing();
2820 #endif
2821     if (!validTo) {
2822 	Debug(this,DebugNote,"Received '%s' with invalid to='%s' [%p]",
2823 	    xml->tag(),to.c_str(),this);
2824 	terminate(0,false,0,
2825 	    to.null() ? XMPPError::BadAddressing : XMPPError::HostUnknown,
2826 	    "Invalid 'to' attribute");
2827 	return false;
2828     }
2829     if (incoming() || flag(StreamRemoteVer1)) {
2830 	m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
2831 	return true;
2832     }
2833     Debug(this,DebugNote,"Outgoing client stream: unsupported remote version (expecting 1.x)");
2834     terminate(0,true,0,XMPPError::Internal,"Unsupported version");
2835     return false;
2836 }
2837 
2838 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)2839 bool JBClientStream::processAuth(XmlElement* xml, const JabberID& from,
2840     const JabberID& to)
2841 {
2842     if (!xml)
2843 	return true;
2844     if (incoming())
2845 	return destroyDropXml(xml,XMPPError::Internal,"invalid state for incoming stream");
2846     int t,n;
2847     if (!XMPPUtils::getTag(*xml,t,n))
2848 	return destroyDropXml(xml,XMPPError::Internal,"failed to retrieve element tag");
2849 
2850     // Authenticating
2851     if (!flag(StreamAuthenticated)) {
2852 	// TODO: The server might challenge us again
2853 	//       Implement support for multiple challenge/response steps
2854 	if (n != XMPPNamespace::Sasl)
2855 	    return destroyDropXml(xml,XMPPError::InvalidNamespace,
2856 		"element with non SASL namespace");
2857 	if (!m_sasl)
2858 	    return destroyDropXml(xml,XMPPError::Internal,"no SASL data");
2859 	if (t == XmlTag::Failure) {
2860 	    terminate(0,true,xml);
2861 	    return false;
2862 	}
2863 	if (!m_sasl->m_plain) {
2864 	    // Digest MD5
2865 	    if (flag(StreamWaitChallenge)) {
2866 		if (t != XmlTag::Challenge)
2867 		    return destroyDropXml(xml,XMPPError::BadRequest,"expecting challenge");
2868 		String tmp;
2869 		if (!decodeBase64(tmp,xml->getText(),this))
2870 		    return destroyDropXml(xml,XMPPError::IncorrectEnc,
2871 			"challenge with incorrect encoding");
2872 		if (!m_sasl->parseMD5Challenge(tmp))
2873 		    return destroyDropXml(xml,XMPPError::MalformedRequest,
2874 			"invalid challenge format");
2875 		TelEngine::destruct(xml);
2876 		m_sasl->setAuthParams(m_local.node(),m_password);
2877 		tmp.clear();
2878 		m_sasl->buildAuthRsp(tmp,"xmpp/" + m_local.domain());
2879 		resetFlags(StreamWaitChallenge);
2880 		setFlags(StreamWaitChgRsp);
2881 		XmlElement* rsp = XMPPUtils::createElement(XmlTag::Response,XMPPNamespace::Sasl,tmp);
2882 		return sendStreamXml(state(),rsp);
2883 	    }
2884 	    // Digest MD5 response reply
2885 	    if (flag(StreamWaitChgRsp)) {
2886 #ifdef RFC3920
2887 		// Expect success or challenge
2888 		// challenge is accepted if not already received one
2889 		if (t != XmlTag::Success && (t != XmlTag::Challenge || flag(StreamRfc3920Chg)))
2890 #else
2891 		// Expect success
2892 		if (t != XmlTag::Success)
2893 #endif
2894 		    return dropXml(xml,"unexpected element");
2895 		if (!flag(StreamRfc3920Chg)) {
2896 		    String rspAuth;
2897 		    if (!decodeBase64(rspAuth,xml->getText(),this))
2898 			return destroyDropXml(xml,XMPPError::IncorrectEnc,
2899 			    "challenge response reply with incorrect encoding");
2900 		    if (!rspAuth.startSkip("rspauth=",false))
2901 			return destroyDropXml(xml,XMPPError::BadFormat,
2902 			    "invalid challenge response reply");
2903 		    if (!m_sasl->validAuthReply(rspAuth))
2904 			return destroyDropXml(xml,XMPPError::InvalidAuth,
2905 			    "incorrect challenge response reply auth");
2906 		}
2907 #ifdef RFC3920
2908 		// Send empty response to challenge
2909 		if (t == XmlTag::Challenge) {
2910 		    setFlags(StreamRfc3920Chg);
2911 		    TelEngine::destruct(xml);
2912 		    XmlElement* rsp = XMPPUtils::createElement(XmlTag::Response,
2913 			XMPPNamespace::Sasl);
2914 		    return sendStreamXml(state(),rsp);
2915 		}
2916 #endif
2917 		resetFlags(StreamWaitChgRsp | StreamRfc3920Chg);
2918 	    }
2919 	    else
2920 		return dropXml(xml,"unhandled sasl digest md5 state");
2921 	}
2922 	else {
2923 	    // Plain
2924 	    if (t != XmlTag::Success)
2925 		return dropXml(xml,"unexpected element");
2926 	}
2927 	// Authenticated. Bind a resource
2928 	Debug(this,DebugAll,"Authenticated [%p]",this);
2929 	TelEngine::destruct(xml);
2930 	TelEngine::destruct(m_sasl);
2931 	setFlags(StreamAuthenticated);
2932 	XmlElement* start = buildStreamStart();
2933 	return sendStreamXml(WaitStart,start);
2934     }
2935 
2936     XMPPUtils::IqType iq = XMPPUtils::iqType(xml->attribute("type"));
2937     String* id = xml->getAttribute("id");
2938 
2939     // Waiting for bind response
2940     if (flag(StreamWaitBindRsp)) {
2941 	// Expecting 'iq' result or error
2942 	if (t != XmlTag::Iq ||
2943 	    (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError) ||
2944 	    !id || *id != "bind_1")
2945 	    return dropXml(xml,"unexpected element");
2946 	if (iq == XMPPUtils::IqError) {
2947 	    Debug(this,DebugNote,"Resource binding failed [%p]",this);
2948 	    terminate(0,true,xml);
2949 	    return false;
2950 	}
2951 	// Check it
2952 	bool ok = false;
2953 	while (true) {
2954 	    XmlElement* bind = XMPPUtils::findFirstChild(*xml,XmlTag::Bind,XMPPNamespace::Bind);
2955 	    if (!bind)
2956 		break;
2957 	    XmlElement* tmp = bind->findFirstChild(&XMPPUtils::s_tag[XmlTag::Jid]);
2958 	    if (!tmp)
2959 		break;
2960 	    JabberID jid(tmp->getText());
2961 	    if (jid.bare() != m_local.bare())
2962 		break;
2963 	    ok = true;
2964 	    if (m_local.resource() != jid.resource()) {
2965 		m_local.resource(jid.resource());
2966 		Debug(this,DebugAll,"Resource set to '%s' [%p]",
2967 		    local().resource().c_str(),this);
2968 	    }
2969 	    break;
2970 	}
2971 	if (!ok)
2972 	    return destroyDropXml(xml,XMPPError::UndefinedCondition,
2973 		"unacceptable bind response");
2974 	resetFlags(StreamWaitBindRsp);
2975 	TelEngine::destruct(xml);
2976 	if (!m_features.get(XMPPNamespace::Session)) {
2977 	    changeState(Running);
2978 	    return true;
2979 	}
2980 	// Send session
2981 	XmlElement* sess = XMPPUtils::createIq(XMPPUtils::IqSet,0,0,"sess_1");
2982 	sess->addChild(XMPPUtils::createElement(XmlTag::Session,XMPPNamespace::Session));
2983 	setFlags(StreamWaitSessRsp);
2984 	return sendStreamXml(state(),sess);
2985     }
2986 
2987     // Waiting for session response
2988     if (flag(StreamWaitSessRsp)) {
2989 	// Expecting 'iq' result or error
2990 	if (t != XmlTag::Iq ||
2991 	    (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError) ||
2992 	    !id || *id != "sess_1")
2993 	    return dropXml(xml,"unexpected element");
2994 	if (iq == XMPPUtils::IqError) {
2995 	    Debug(this,DebugNote,"Session failed [%p]",this);
2996 	    terminate(0,true,xml);
2997 	    return false;
2998 	}
2999 	TelEngine::destruct(xml);
3000 	resetFlags(StreamWaitSessRsp);
3001 	changeState(Running);
3002 	return true;
3003     }
3004 
3005     return dropXml(xml,"unhandled");
3006 }
3007 
3008 // Process elements in Register state
processRegister(XmlElement * xml,const JabberID & from,const JabberID & to)3009 bool JBClientStream::processRegister(XmlElement* xml, const JabberID& from,
3010     const JabberID& to)
3011 {
3012     if (!xml)
3013 	return true;
3014     int t, ns;
3015     if (!XMPPUtils::getTag(*xml,t,ns))
3016 	return dropXml(xml,"failed to retrieve element tag");
3017     if (t != XmlTag::Iq)
3018 	return dropXml(xml,"expecting 'iq'");
3019     XMPPUtils::IqType iq = XMPPUtils::iqType(xml->attribute("type"));
3020     if (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError)
3021 	return dropXml(xml,"expecting 'iq' response");
3022     if (!isRegisterId(*xml))
3023 	return dropXml(xml,"unexpected response id");
3024     if (iq == XMPPUtils::IqError) {
3025 	m_events.append(new JBEvent(JBEvent::RegisterFailed,this,xml,from,to));
3026 	// Don't terminate if the user requested account change after authentication
3027 	if (!flag(StreamAuthenticated))
3028 	    terminate(0,true,0,XMPPError::NoError);
3029 	return flag(StreamAuthenticated);
3030     }
3031     // Requested registration data
3032     if (m_registerReq == '1') {
3033 	// XEP-0077: check for username and password children or
3034 	//  instructions
3035 	XmlElement* query = XMPPUtils::findFirstChild(*xml,XmlTag::Query,
3036 	    XMPPNamespace::IqRegister);
3037 	if (query && XMPPUtils::findFirstChild(*query,XmlTag::Username) &&
3038 	    XMPPUtils::findFirstChild(*query,XmlTag::Password)) {
3039 	    TelEngine::destruct(xml);
3040 	    return requestRegister(true);
3041 	}
3042 	m_events.append(new JBEvent(JBEvent::RegisterFailed,this,xml,from,to));
3043 	// Don't terminate if the user requested account change after authentication
3044 	if (!flag(StreamAuthenticated))
3045 	    terminate(0,true,0,XMPPError::NoError);
3046 	return flag(StreamAuthenticated);
3047     }
3048     // Requested registration/change
3049     if (m_registerReq == '2') {
3050 	m_events.append(new JBEvent(JBEvent::RegisterOk,this,xml,from,to));
3051 	// Reset register user flag
3052 	resetFlags(RegisterUser);
3053 	// Done if account changed after authentication
3054 	if (flag(StreamAuthenticated)) {
3055 	    m_password = m_newPassword;
3056 	    return true;
3057 	}
3058 	// Start auth
3059 	changeState(Features);
3060 	return startAuth();
3061     }
3062     // Requested account removal
3063     if (m_registerReq == '3') {
3064 	terminate(0,true,xml,XMPPError::Reg,"Account removed");
3065 	return false;
3066     }
3067     return destroyDropXml(xml,XMPPError::Internal,"unhandled state");
3068 }
3069 
3070 // Release memory
destroyed()3071 void JBClientStream::destroyed()
3072 {
3073     userData(0);
3074     JBStream::destroyed();
3075 }
3076 
3077 // Start outgoing stream authentication
startAuth()3078 bool JBClientStream::startAuth()
3079 {
3080     if (incoming() || state() != Features)
3081 	return false;
3082 
3083     TelEngine::destruct(m_sasl);
3084 
3085     XMPPFeatureSasl* sasl = m_features.getSasl();
3086     if (!sasl) {
3087 	terminate(0,true,0,XMPPError::NoError,"Missing authentication data");
3088 	return false;
3089     }
3090 
3091     // RFC 3920 SASL auth
3092     int mech = XMPPUtils::AuthNone;
3093     if (sasl->mechanism(XMPPUtils::AuthMD5))
3094 	mech = XMPPUtils::AuthMD5;
3095     else if (sasl->mechanism(XMPPUtils::AuthPlain) && flag(AllowPlainAuth))
3096 	mech = XMPPUtils::AuthPlain;
3097     else {
3098 	terminate(0,true,0,XMPPError::NoError,"Unsupported authentication mechanism");
3099 	return false;
3100     }
3101 
3102     m_sasl = new SASL(mech == XMPPUtils::AuthPlain);
3103     String rsp;
3104     if (m_sasl->m_plain) {
3105 	m_sasl->setAuthParams(m_local.node(),m_password);
3106 	if (!m_sasl->buildAuthRsp(rsp)) {
3107 	    terminate(0,true,0,XMPPError::NoError,"Invalid auth data length for plain auth");
3108 	    return false;
3109 	}
3110     }
3111     else
3112 	setFlags(StreamWaitChallenge);
3113     // MD5: send auth element, wait challenge
3114     // Plain auth: send auth element with credentials and wait response (success/failure)
3115     XmlElement* xml = XMPPUtils::createElement(XmlTag::Auth,XMPPNamespace::Sasl,rsp);
3116     xml->setAttribute("mechanism",lookup(mech,XMPPUtils::s_authMeth));
3117     return sendStreamXml(Auth,xml);
3118 }
3119 
3120 // Start resource binding on outgoing stream
bind()3121 bool JBClientStream::bind()
3122 {
3123     Debug(this,DebugAll,"Binding resource [%p]",this);
3124     XmlElement* bind = XMPPUtils::createElement(XmlTag::Bind,XMPPNamespace::Bind);
3125     if (m_local.resource())
3126 	bind->addChild(XMPPUtils::createElement(XmlTag::Resource,m_local.resource()));
3127     XmlElement* b = XMPPUtils::createIq(XMPPUtils::IqSet,0,0,"bind_1");
3128     b->addChild(bind);
3129     setFlags(StreamWaitBindRsp);
3130     return sendStreamXml(Auth,b);
3131 }
3132 
3133 
3134 /*
3135  * JBServerStream
3136  */
3137 // Build an incoming stream from a socket
JBServerStream(JBEngine * engine,Socket * socket,bool component)3138 JBServerStream::JBServerStream(JBEngine* engine, Socket* socket, bool component)
3139     : JBStream(engine,socket,component ? comp : s2s),
3140     m_remoteDomains(""), m_dbKey(0)
3141 {
3142 }
3143 
3144 // Build an outgoing stream
JBServerStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const char * dbId,const char * dbKey,bool dbOnly,const NamedList * params)3145 JBServerStream::JBServerStream(JBEngine* engine, const JabberID& local,
3146     const JabberID& remote, const char* dbId, const char* dbKey, bool dbOnly,
3147     const NamedList* params)
3148     : JBStream(engine,s2s,local,remote,0,params),
3149     m_remoteDomains(""), m_dbKey(0)
3150 {
3151     if (!(TelEngine::null(dbId) || TelEngine::null(dbKey)))
3152 	m_dbKey = new NamedString(dbId,dbKey);
3153     if (dbOnly)
3154 	setFlags(DialbackOnly | NoAutoRestart);
3155 }
3156 
3157 // Constructor. Build an outgoing component stream
JBServerStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const String * name,const NamedList * params)3158 JBServerStream::JBServerStream(JBEngine* engine, const JabberID& local, const JabberID& remote,
3159     const String* name, const NamedList* params)
3160     : JBStream(engine,comp,local,remote,name ? name->c_str() : 0,params),
3161     m_remoteDomains(""), m_dbKey(0)
3162 {
3163     if (params)
3164 	m_password = params->getValue("password");
3165 }
3166 
3167 // Send a dialback verify response
sendDbVerify(const char * from,const char * to,const char * id,XMPPError::Type rsp)3168 bool JBServerStream::sendDbVerify(const char* from, const char* to, const char* id,
3169     XMPPError::Type rsp)
3170 {
3171     adjustDbRsp(rsp);
3172     XmlElement* result = XMPPUtils::createDialbackVerifyRsp(from,to,id,rsp);
3173     DDebug(this,DebugAll,"Sending '%s' db:verify response from %s to %s [%p]",
3174 	result->attribute("type"),from,to,this);
3175     return state() < Running ? sendStreamXml(state(),result) : sendStanza(result);
3176 }
3177 
3178 // Send a dialback key response. Update the remote domains list
3179 // Terminate the stream if there are no more remote domains
sendDbResult(const JabberID & from,const JabberID & to,XMPPError::Type rsp)3180 bool JBServerStream::sendDbResult(const JabberID& from, const JabberID& to, XMPPError::Type rsp)
3181 {
3182     Lock lock(this);
3183     // Check local domain
3184     if (m_local != from)
3185 	return false;
3186     // Respond only to received requests
3187     NamedString* p = m_remoteDomains.getParam(to);
3188     if (!p)
3189 	return false;
3190     bool valid = rsp == XMPPError::NoError;
3191     // Don't deny already authenticated requests
3192     if (p->null() && !valid)
3193 	return false;
3194     // Set request state or remove it if not accepted
3195     if (valid)
3196 	p->clear();
3197     else
3198 	m_remoteDomains.clearParam(to);
3199     bool ok = false;
3200     adjustDbRsp(rsp);
3201     XmlElement* result = XMPPUtils::createDialbackResult(from,to,rsp);
3202     DDebug(this,DebugAll,"Sending '%s' db:result response from %s to %s [%p]",
3203 	result->attribute("type"),from.c_str(),to.c_str(),this);
3204     if (m_state < Running) {
3205 	ok = sendStreamXml(Running,result);
3206 	// Remove features and set the authenticated flag
3207 	if (ok && valid) {
3208 	    m_features.remove(XMPPNamespace::Sasl);
3209 	    m_features.remove(XMPPNamespace::IqAuth);
3210 	    setFlags(StreamAuthenticated);
3211 	    // Compression can still be set
3212 	    if (!flag(StreamCompressed) && m_features.get(XMPPNamespace::CompressFeature))
3213 		setFlags(StreamCanCompress);
3214 	    else
3215 		resetFlags(StreamCanCompress);
3216 	}
3217     }
3218     else if (m_state == Running)
3219 	ok = sendStanza(result);
3220     else
3221 	TelEngine::destruct(result);
3222     // Terminate the stream if there are no more remote domains
3223     if (!m_remoteDomains.count())
3224 	terminate(-1,true,0,rsp);
3225     return ok;
3226 }
3227 
3228 // Send dialback data (key/verify)
sendDialback()3229 bool JBServerStream::sendDialback()
3230 {
3231     State newState = Running;
3232     XmlElement* result = 0;
3233     if (!flag(DialbackOnly)) {
3234 	if (flag(StreamAuthenticated))
3235 	    newState = Running;
3236 	else {
3237 	    String key;
3238 	    engine()->buildDialbackKey(id(),m_local,m_remote,key);
3239 	    result = XMPPUtils::createDialbackKey(m_local,m_remote,key);
3240 	    newState = Auth;
3241 	}
3242     }
3243     else if (!m_dbKey) {
3244 	// Dialback only with no key?
3245 	Debug(this,DebugNote,"Outgoing dialback stream with no key! [%p]",this);
3246 	terminate(0,true,0,XMPPError::Internal);
3247 	return false;
3248     }
3249     if (m_dbKey) {
3250 	XmlElement* db = XMPPUtils::createDialbackVerify(m_local,m_remote,
3251 	    m_dbKey->name(),*m_dbKey);
3252 	if (result)
3253 	    return sendStreamXml(newState,result,db);
3254 	return sendStreamXml(newState,db);
3255     }
3256     if (result)
3257 	return sendStreamXml(newState,result);
3258     changeState(newState);
3259     return true;
3260 }
3261 
3262 // Release memory
destroyed()3263 void JBServerStream::destroyed()
3264 {
3265     TelEngine::destruct(m_dbKey);
3266     JBStream::destroyed();
3267 }
3268 
3269 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)3270 bool JBServerStream::processRunning(XmlElement* xml, const JabberID& from,
3271     const JabberID& to)
3272 {
3273     if (!xml)
3274 	return true;
3275     // Incoming, authenticated stream which might still request compression
3276     // Any other element will reset compression offer
3277     if (flag(StreamCanCompress)) {
3278 	if (incoming() && !flag(StreamCompressed) &&
3279 	    m_features.get(XMPPNamespace::CompressFeature)) {
3280 	    int t = XmlTag::Count;
3281 	    int n = XMPPNamespace::Count;
3282 	    XMPPUtils::getTag(*xml,t,n);
3283 	    if (t == XmlTag::Compress && n == XMPPNamespace::Compress)
3284 		return handleCompressReq(xml);
3285 	}
3286 	resetFlags(StreamCanCompress);
3287 	m_features.remove(XMPPNamespace::CompressFeature);
3288     }
3289     // Check the tags of known dialback elements:
3290     //  there are servers who don't stamp them with the namespace
3291     // Let other elements stamped with dialback namespace go the upper layer
3292     if (type() != comp && isDbResult(*xml)) {
3293 	if (outgoing())
3294 	    return dropXml(xml,"dialback result on outgoing stream");
3295 	return processDbResult(xml,from,to);
3296     }
3297     // Call default handler
3298     return JBStream::processRunning(xml,from,to);
3299 }
3300 
3301 // Build a stream start XML element
buildStreamStart()3302 XmlElement* JBServerStream::buildStreamStart()
3303 {
3304     XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
3305     if (incoming())
3306 	start->setAttribute("id",m_id);
3307     XMPPUtils::setStreamXmlns(*start);
3308     start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
3309     if (type() == s2s) {
3310 	start->setAttribute(XmlElement::s_nsPrefix + "db",XMPPUtils::s_ns[XMPPNamespace::Dialback]);
3311 	if (!dialback()) {
3312 	    start->setAttributeValid("from",m_local.bare());
3313 	    start->setAttributeValid("to",m_remote.bare());
3314 	    if (outgoing() || flag(StreamLocalVer1))
3315 		start->setAttribute("version","1.0");
3316 	    start->setAttribute("xml:lang","en");
3317 	}
3318     }
3319     else if (type() == comp) {
3320 	if (incoming())
3321 	    start->setAttributeValid("from",m_remote.domain());
3322 	else
3323 	    start->setAttributeValid("to",m_local.domain());
3324     }
3325     return start;
3326 }
3327 
3328 // Process received elements in WaitStart state
3329 // WaitStart: Incoming: waiting for stream start
3330 //            Outgoing: idem (our stream start was already sent)
3331 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)3332 bool JBServerStream::processStart(const XmlElement* xml, const JabberID& from,
3333     const JabberID& to)
3334 {
3335     XDebug(this,DebugAll,"JBServerStream::processStart() [%p]",this);
3336 
3337     if (!processStreamStart(xml))
3338 	return false;
3339 
3340     if (type() == comp) {
3341 	String from = xml->attribute("from");
3342 	if (m_local == from) {
3343 	    changeState(Starting);
3344 	    m_events.append(new JBEvent(JBEvent::Start,this,0,to,JabberID::empty()));
3345 	}
3346 	else
3347 	    terminate(0,false,0,XMPPError::InvalidFrom);
3348 	return false;
3349     }
3350 
3351     if (outgoing()) {
3352 	m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
3353 	return true;
3354     }
3355 
3356     // Incoming stream
3357     m_local = to;
3358     if (m_local && !engine()->hasDomain(m_local)) {
3359 	terminate(0,true,0,XMPPError::HostUnknown);
3360 	return false;
3361     }
3362     updateFromRemoteDef();
3363     m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
3364     return true;
3365 }
3366 
3367 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)3368 bool JBServerStream::processAuth(XmlElement* xml, const JabberID& from,
3369     const JabberID& to)
3370 {
3371     if (incoming())
3372 	return dropXml(xml,"invalid state for incoming stream");
3373     // Component
3374     if (type() == comp) {
3375 	int t,n;
3376 	if (!XMPPUtils::getTag(*xml,t,n))
3377 	    return destroyDropXml(xml,XMPPError::Internal,"failed to retrieve element tag");
3378 	if (t != XmlTag::Handshake || n != m_xmlns)
3379 	    return dropXml(xml,"expecting handshake in stream's namespace");
3380 	// Stream authenticated
3381 	TelEngine::destruct(xml);
3382 	setFlags(StreamAuthenticated);
3383 	changeState(Running);
3384 	Debug(this,DebugAll,"Authenticated [%p]",this);
3385 	return true;
3386     }
3387     // Waiting for db:result
3388     if (!isDbResult(*xml))
3389 	return dropXml(xml,"expecting dialback result");
3390     // Result
3391     // Outgoing stream waiting for dialback key response
3392     if (outgoing()) {
3393 	if (m_remote != from || m_local != to)
3394 	    return destroyDropXml(xml,XMPPError::BadAddressing,
3395 		"dialback response with invalid 'from'");
3396 	// Expect dialback key response
3397 	int rsp = XMPPUtils::decodeDbRsp(xml);
3398 	if (rsp != XMPPError::NoError) {
3399 	    terminate(1,false,xml,rsp);
3400 	    return false;
3401 	}
3402 	// Stream authenticated
3403 	TelEngine::destruct(xml);
3404 	setFlags(StreamAuthenticated);
3405 	// Check compression
3406 	XmlElement* x = checkCompress();
3407 	if (x)
3408 	    return sendStreamXml(Compressing,x);
3409 	changeState(Running);
3410 	return true;
3411     }
3412     return dropXml(xml,"incomplete state process");
3413 }
3414 
3415 // Start the stream (reply to received stream start)
startComp(const String & local,const String & remote)3416 bool JBServerStream::startComp(const String& local, const String& remote)
3417 {
3418     if (state() != Starting || type() != comp)
3419 	return false;
3420     Lock lock(this);
3421     XmlElement* s = 0;
3422     if (incoming()) {
3423 	m_local.set(local);
3424 	m_remote.set(remote);
3425 	s = buildStreamStart();
3426     }
3427     else {
3428 	String digest;
3429 	buildSha1Digest(digest,m_password);
3430 	s = XMPPUtils::createElement(XmlTag::Handshake,digest);
3431     }
3432     setSecured();
3433     return sendStreamXml(incoming() ? Features : Auth,s);
3434 }
3435 
3436 // Process dialback key (db:result) requests
processDbResult(XmlElement * xml,const JabberID & from,const JabberID & to)3437 bool JBServerStream::processDbResult(XmlElement* xml, const JabberID& from,
3438     const JabberID& to)
3439 {
3440     // Check TLS when stream:features were sent
3441     if (m_state == Features) {
3442 	if (flag(TlsRequired) && !flag(StreamSecured))
3443 	    return destroyDropXml(xml,XMPPError::EncryptionRequired,
3444 		"required encryption not supported by remote");
3445 	// TLS can't be negotiated anymore
3446 	setFlags(StreamSecured);
3447     }
3448     // Check remote domain
3449     if (!from)
3450 	return destroyDropXml(xml,XMPPError::BadAddressing,
3451 	    "dialback result with empty 'from' domain");
3452     // Accept non empty key only
3453     const char* key = xml->getText();
3454     if (TelEngine::null(key))
3455 	return destroyDropXml(xml,XMPPError::NotAcceptable,
3456 	    "dialback result with empty key");
3457     // Check local domain
3458     if (!(to && engine()->hasDomain(to))) {
3459 	const char* reason = "dialback result with unknown 'to' domain";
3460 	dropXml(xml,reason);
3461 	XmlElement* rsp = XMPPUtils::createDialbackResult(to,from,XMPPError::ItemNotFound);
3462 	if (m_state < Running)
3463 	    sendStreamXml(state(),rsp);
3464 	else
3465 	    sendStanza(rsp);
3466 	return false;
3467     }
3468     if (!m_local)
3469 	m_local = to;
3470     else if (m_local != to)
3471 	return destroyDropXml(xml,XMPPError::NotAcceptable,
3472 	    "dialback result with incorrect 'to' domain");
3473     // Ignore duplicate requests
3474     if (m_remoteDomains.getParam(from)) {
3475 	dropXml(xml,"duplicate dialback key request");
3476 	return false;
3477     }
3478     m_remoteDomains.addParam(from,key);
3479     DDebug(this,DebugAll,"Added db:result request from %s [%p]",from.c_str(),this);
3480     // Notify the upper layer of incoming request
3481     JBEvent* ev = new JBEvent(JBEvent::DbResult,this,xml,from,to);
3482     ev->m_text = key;
3483     m_events.append(ev);
3484     return true;
3485 }
3486 
3487 
3488 /*
3489  * JBClusterStream
3490  */
3491 // Build an incoming stream from a socket
JBClusterStream(JBEngine * engine,Socket * socket)3492 JBClusterStream::JBClusterStream(JBEngine* engine, Socket* socket)
3493     : JBStream(engine,socket,cluster)
3494 {
3495 }
3496 
3497 // Build an outgoing stream
JBClusterStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const NamedList * params)3498 JBClusterStream::JBClusterStream(JBEngine* engine, const JabberID& local,
3499     const JabberID& remote, const NamedList* params)
3500     : JBStream(engine,cluster,local,remote,0,params)
3501 {
3502 }
3503 
3504 // Build a stream start XML element
buildStreamStart()3505 XmlElement* JBClusterStream::buildStreamStart()
3506 {
3507     XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
3508     if (incoming())
3509 	start->setAttribute("id",m_id);
3510     XMPPUtils::setStreamXmlns(*start);
3511     start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
3512     start->setAttributeValid("from",m_local);
3513     start->setAttributeValid("to",m_remote);
3514     start->setAttribute("version","1.0");
3515     start->setAttribute("xml:lang","en");
3516     return start;
3517 }
3518 
3519 // Process received elements in WaitStart state
3520 // WaitStart: Incoming: waiting for stream start
3521 //            Outgoing: idem (our stream start was already sent)
3522 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)3523 bool JBClusterStream::processStart(const XmlElement* xml, const JabberID& from,
3524     const JabberID& to)
3525 {
3526     XDebug(this,DebugAll,"JBClusterStream::processStart() [%p]",this);
3527     if (!processStreamStart(xml))
3528 	return false;
3529     // Check from/to
3530     bool ok = true;
3531     if (outgoing())
3532 	ok = (m_local == to) && (m_remote == from);
3533     else {
3534 	if (!m_remote) {
3535 	    m_local = to;
3536 	    m_remote = from;
3537 	    ok = from && to;
3538 	}
3539 	else
3540 	    ok = (m_local == to) && (m_remote == from);
3541     }
3542     if (!ok) {
3543 	Debug(this,DebugNote,"Got invalid from='%s' or to='%s' in stream start [%p]",
3544 	    from.c_str(),to.c_str(),this);
3545 	terminate(0,true,0,XMPPError::BadAddressing);
3546 	return false;
3547     }
3548     m_events.append(new JBEvent(JBEvent::Start,this,0,m_remote,m_local));
3549     return true;
3550 }
3551 
3552 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)3553 bool JBClusterStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
3554 {
3555     if (!xml)
3556 	return true;
3557     int t, ns;
3558     if (!XMPPUtils::getTag(*xml,t,ns))
3559 	return dropXml(xml,"failed to retrieve element tag");
3560     JBEvent::Type evType = JBEvent::Unknown;
3561     XmlElement* child = 0;
3562     switch (t) {
3563 	case XmlTag::Iq:
3564 	    checkPing(this,xml,m_pingId);
3565 	    evType = JBEvent::Iq;
3566 	    child = xml->findFirstChild();
3567 	    break;
3568 	case XmlTag::Message:
3569 	    evType = JBEvent::Message;
3570 	    break;
3571 	case XmlTag::Presence:
3572 	    evType = JBEvent::Presence;
3573 	    break;
3574 	default: ;
3575     }
3576     m_events.append(new JBEvent(evType,this,xml,m_remote,m_local,child));
3577     return true;
3578 }
3579 
3580 /* vi: set ts=8 sw=4 sts=4 noet: */
3581