1 /**
2 * jbstream.cpp
3 * Yet Another Jabber Component Protocol Stack
4 * This file is part of the YATE Project http://YATE.null.ro
5 *
6 * Yet Another Telephony Engine - a fully featured software PBX and IVR
7 * Copyright (C) 2004-2014 Null Team
8 *
9 * This software is distributed under multiple licenses;
10 * see the COPYING file in the main directory for licensing
11 * information for this specific distribution.
12 *
13 * This use of this software may be subject to additional restrictions.
14 * See the LEGAL file in the main directory for details.
15 *
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
19 */
20
21 #include <yatejabber.h>
22 #include <stdlib.h>
23
24 using namespace TelEngine;
25
26 #ifdef XDEBUG
27 #define JBSTREAM_DEBUG_COMPRESS
28 #define JBSTREAM_DEBUG_SOCKET
29 #else
30 // #define JBSTREAM_DEBUG_COMPRESS // Show (de)compress debug
31 // #define JBSTREAM_DEBUG_SOCKET // Show socket read/write debug
32 #endif
33
34 static const String s_dbVerify = "verify";
35 static const String s_dbResult = "result";
36
isDbVerify(XmlElement & xml)37 static inline bool isDbVerify(XmlElement& xml)
38 {
39 const String* tag = 0;
40 const String* ns = 0;
41 return xml.getTag(tag,ns) && *tag == s_dbVerify &&
42 ns && *ns == XMPPUtils::s_ns[XMPPNamespace::Dialback];
43 }
44
isDbResult(XmlElement & xml)45 static inline bool isDbResult(XmlElement& xml)
46 {
47 const String* tag = 0;
48 const String* ns = 0;
49 return xml.getTag(tag,ns) && *tag == s_dbResult &&
50 ns && *ns == XMPPUtils::s_ns[XMPPNamespace::Dialback];
51 }
52
53 // Decode a Base64 string to a block
decodeBase64(DataBlock & buf,const String & str)54 static inline bool decodeBase64(DataBlock& buf, const String& str)
55 {
56 Base64 b((void*)str.c_str(),str.length(),false);
57 bool ok = b.decode(buf,false);
58 b.clear(false);
59 return ok;
60 }
61
62 // Decode a Base64 string to another string
63 // Check if decoded data has valid UTF8 characters
decodeBase64(String & buf,const String & str,JBStream * stream)64 static bool decodeBase64(String& buf, const String& str, JBStream* stream)
65 {
66 DataBlock d;
67 if (!decodeBase64(d,str))
68 return false;
69 buf.assign((const char*)d.data(),d.length());
70 if (-1 != buf.lenUtf8())
71 return true;
72 Debug(stream,DebugNote,"Received Base64 encoded invalid UTF8 characters [%p]",stream);
73 return false;
74 }
75
76 #ifdef DEBUG
checkPing(JBStream * stream,const XmlElement * xml,const String & pingId)77 static bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
78 {
79 if (!(stream && xml && pingId))
80 return false;
81 if (pingId != xml->getAttribute(YSTRING("id")))
82 return false;
83 const char* it = xml->attribute(YSTRING("type"));
84 XMPPUtils::IqType iqType = XMPPUtils::iqType(it);
85 bool ok = (iqType == XMPPUtils::IqResult || iqType == XMPPUtils::IqError);
86 if (ok)
87 Debug(stream,DebugAll,"Ping with id=%s confirmed by '%s' [%p]",pingId.c_str(),it,stream);
88 return ok;
89 }
90 #else
checkPing(JBStream * stream,const XmlElement * xml,const String & pingId)91 static inline bool checkPing(JBStream* stream, const XmlElement* xml, const String& pingId)
92 {
93 return false;
94 }
95 #endif
96
97 static const TokenDict s_location[] = {
98 {"internal", 0},
99 {"remote", 1},
100 {"local", -1},
101 {0,0},
102 };
103
104 const TokenDict JBStream::s_stateName[] = {
105 {"Running", Running},
106 {"Idle", Idle},
107 {"Connecting", Connecting},
108 {"WaitStart", WaitStart},
109 {"Starting", Starting},
110 {"Features", Features},
111 {"WaitTlsRsp", WaitTlsRsp},
112 {"Auth", Auth},
113 {"Challenge", Challenge},
114 {"Securing", Securing},
115 {"Compressing", Compressing},
116 {"Register", Register},
117 {"Destroy", Destroy},
118 {0,0},
119 };
120
121 const TokenDict JBStream::s_flagName[] = {
122 {"noautorestart", NoAutoRestart},
123 {"tlsrequired", TlsRequired},
124 {"dialback", DialbackOnly},
125 {"allowplainauth", AllowPlainAuth},
126 {"register", RegisterUser},
127 {"compress", Compress},
128 {"error", InError},
129 // Internal flags
130 {"roster_requested", RosterRequested},
131 {"online", AvailableResource},
132 {"secured", StreamTls | StreamSecured},
133 {"encrypted", StreamTls},
134 {"authenticated", StreamAuthenticated},
135 {"waitbindrsp", StreamWaitBindRsp},
136 {"waitsessrsp", StreamWaitSessRsp},
137 {"waitchallenge", StreamWaitChallenge},
138 {"waitchallengersp", StreamWaitChgRsp},
139 {"version1", StreamRemoteVer1},
140 {"compressed", StreamCompressed},
141 {"cancompress", StreamCanCompress},
142 {0,0}
143 };
144
145 const TokenDict JBStream::s_typeName[] = {
146 {"c2s", c2s},
147 {"s2s", s2s},
148 {"comp", comp},
149 {"cluster", cluster},
150 {0,0}
151 };
152
153 // Retrieve the multiplier for non client stream timers
timerMultiplier(JBStream * stream)154 static inline unsigned int timerMultiplier(JBStream* stream)
155 {
156 return stream->type() == JBStream::c2s ? 1 : 2;
157 }
158
159
160 /*
161 * JBStream
162 */
163 // Incoming
JBStream(JBEngine * engine,Socket * socket,Type t,bool ssl)164 JBStream::JBStream(JBEngine* engine, Socket* socket, Type t, bool ssl)
165 : Mutex(true,"JBStream"),
166 m_sasl(0),
167 m_state(Idle), m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0),
168 m_setupTimeout(0), m_startTimeout(0),
169 m_pingTimeout(0), m_pingInterval(0), m_nextPing(0),
170 m_idleTimeout(0), m_connectTimeout(0),
171 m_restart(0), m_timeToFillRestart(0),
172 m_engine(engine), m_type(t),
173 m_incoming(true), m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
174 m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
175 m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start),
176 m_redirectMax(0), m_redirectCount(0), m_redirectPort(0)
177 {
178 if (ssl)
179 setFlags(StreamSecured | StreamTls);
180 m_engine->buildStreamName(m_name,this);
181 debugName(m_name);
182 debugChain(m_engine);
183 Debug(this,DebugAll,"JBStream::JBStream(%p,%p,%s,%s) incoming [%p]",
184 engine,socket,typeName(),String::boolText(ssl),this);
185 setXmlns();
186 // Don't restart incoming streams
187 setFlags(NoAutoRestart);
188 resetConnection(socket);
189 changeState(WaitStart);
190 }
191
192 // Outgoing
JBStream(JBEngine * engine,Type t,const JabberID & local,const JabberID & remote,const char * name,const NamedList * params,const char * serverHost)193 JBStream::JBStream(JBEngine* engine, Type t, const JabberID& local, const JabberID& remote,
194 const char* name, const NamedList* params, const char* serverHost)
195 : Mutex(true,"JBStream"),
196 m_sasl(0),
197 m_state(Idle), m_local(local), m_remote(remote), m_serverHost(serverHost),
198 m_flags(0), m_xmlns(XMPPNamespace::Count), m_lastEvent(0), m_stanzaIndex(0),
199 m_setupTimeout(0), m_startTimeout(0),
200 m_pingTimeout(0), m_nextPing(0),
201 m_idleTimeout(0), m_connectTimeout(0),
202 m_restart(1), m_timeToFillRestart(0),
203 m_engine(engine), m_type(t),
204 m_incoming(false), m_name(name),
205 m_terminateEvent(0), m_ppTerminate(0), m_ppTerminateTimeout(0),
206 m_xmlDom(0), m_socket(0), m_socketFlags(0), m_socketMutex(true,"JBStream::Socket"),
207 m_connectPort(0), m_compress(0), m_connectStatus(JBConnect::Start),
208 m_redirectMax(engine->redirectMax()), m_redirectCount(0), m_redirectPort(0)
209 {
210 if (!m_name)
211 m_engine->buildStreamName(m_name,this);
212 debugName(m_name);
213 debugChain(m_engine);
214 if (params) {
215 int flgs = XMPPUtils::decodeFlags(params->getValue("options"),s_flagName);
216 setFlags(flgs & StreamFlags);
217 m_connectAddr = params->getValue("server",params->getValue("address"));
218 m_connectPort = params->getIntValue("port");
219 m_localIp = params->getValue("localip");
220 }
221 else
222 updateFromRemoteDef();
223 // Compress always defaults to true if not explicitly disabled
224 if (!flag(Compress) && !(params && params->getBoolValue("nocompression")))
225 setFlags(Compress);
226 Debug(this,DebugAll,"JBStream::JBStream(%p,%s,%s,%s,%s) outgoing [%p]",
227 engine,typeName(),local.c_str(),remote.c_str(),m_serverHost.safe(),this);
228 setXmlns();
229 changeState(Idle);
230 }
231
232 // Destructor
~JBStream()233 JBStream::~JBStream()
234 {
235 DDebug(this,DebugAll,"JBStream::~JBStream() id=%s [%p]",m_name.c_str(),this);
236 TelEngine::destruct(m_sasl);
237 }
238
239 // Outgoing stream connect terminated notification.
connectTerminated(Socket * & sock)240 void JBStream::connectTerminated(Socket*& sock)
241 {
242 Lock lock(this);
243 if (m_state == Connecting) {
244 if (sock) {
245 resetConnection(sock);
246 sock = 0;
247 changeState(Starting);
248 XmlElement* s = buildStreamStart();
249 sendStreamXml(WaitStart,s);
250 }
251 else {
252 DDebug(this,DebugNote,"Connect failed [%p]",this);
253 resetConnectStatus();
254 setRedirect();
255 m_redirectCount = 0;
256 terminate(0,false,0,XMPPError::NoRemote);
257 }
258 return;
259 }
260 DDebug(this,DebugInfo,"Connect terminated notification in non %s state [%p]",
261 lookup(Connecting,s_stateName),this);
262 if (sock) {
263 delete sock;
264 sock = 0;
265 }
266 }
267
268 // Connecting notification. Start connect timer for synchronous connect
connecting(bool sync,int stat,ObjList & srvs)269 bool JBStream::connecting(bool sync, int stat, ObjList& srvs)
270 {
271 if (incoming() || !m_engine || state() != Connecting)
272 return false;
273 Lock lock(this);
274 if (state() != Connecting)
275 return false;
276 m_connectStatus = stat;
277 SrvRecord::copy(m_connectSrvs,srvs);
278 if (sync) {
279 if (stat != JBConnect::Srv)
280 m_connectTimeout = Time::msecNow() + m_engine->m_connectTimeout;
281 else
282 m_connectTimeout = Time::msecNow() + m_engine->m_srvTimeout;
283 }
284 else
285 m_connectTimeout = 0;
286 DDebug(this,DebugAll,"Connecting sync=%u stat=%s [%p]",
287 sync,lookup(m_connectStatus,JBConnect::s_statusName),this);
288 return true;
289 }
290
291 // Get an object from this stream
getObject(const String & name) const292 void* JBStream::getObject(const String& name) const
293 {
294 if (name == "Socket*")
295 return state() == Securing ? (void*)&m_socket : 0;
296 if (name == "Compressor*")
297 return (void*)&m_compress;
298 if (name == "JBStream")
299 return (void*)this;
300 return RefObject::getObject(name);
301 }
302
303 // Get the string representation of this stream
toString() const304 const String& JBStream::toString() const
305 {
306 return m_name;
307 }
308
309 // Check if the stream has valid pending data
haveData()310 bool JBStream::haveData()
311 {
312 Lock2 lck(this,&m_socketMutex);
313 // Pending data with socket available for writing
314 if (m_pending.skipNull() && socketCanWrite())
315 return true;
316 // Pending events
317 if (m_events.skipNull())
318 return true;
319 // Pending incoming XML
320 XmlDocument* doc = m_xmlDom ? m_xmlDom->document() : 0;
321 XmlElement* root = doc ? doc->root(false) : 0;
322 XmlElement* first = root ? root->findFirstChild() : 0;
323 return first && first->completed();
324 }
325
326 // Retrieve connection address(es), port and status
connectAddr(String & addr,int & port,String & localip,int & stat,ObjList & srvs,bool * isRedirect) const327 void JBStream::connectAddr(String& addr, int& port, String& localip, int& stat,
328 ObjList& srvs, bool* isRedirect) const
329 {
330 if (m_redirectAddr) {
331 addr = m_redirectAddr;
332 port = m_redirectPort;
333 }
334 else {
335 addr = m_connectAddr;
336 port = m_connectPort;
337 }
338 if (isRedirect)
339 *isRedirect = !m_redirectAddr.null();
340 localip = m_localIp;
341 stat = m_connectStatus;
342 SrvRecord::copy(srvs,m_connectSrvs);
343 }
344
345 // Set/reset RosterRequested flag
setRosterRequested(bool ok)346 void JBStream::setRosterRequested(bool ok)
347 {
348 Lock lock(this);
349 if (ok == flag(RosterRequested))
350 return;
351 if (ok)
352 setFlags(RosterRequested);
353 else
354 resetFlags(RosterRequested);
355 XDebug(this,DebugAll,"%s roster requested flag [%p]",ok ? "Set" : "Reset",this);
356 }
357
358 // Set/reset AvailableResource/PositivePriority flags
setAvailableResource(bool ok,bool positive)359 bool JBStream::setAvailableResource(bool ok, bool positive)
360 {
361 Lock lock(this);
362 if (ok && positive)
363 setFlags(PositivePriority);
364 else
365 resetFlags(PositivePriority);
366 if (ok == flag(AvailableResource))
367 return false;
368 if (ok)
369 setFlags(AvailableResource);
370 else
371 resetFlags(AvailableResource);
372 XDebug(this,DebugAll,"%s available resource flag [%p]",ok ? "Set" : "Reset",this);
373 return true;
374 }
375
376 // Read data from socket. Send it to the parser
readSocket(char * buf,unsigned int len)377 bool JBStream::readSocket(char* buf, unsigned int len)
378 {
379 if (!(buf && len > 1))
380 return false;
381 if (!socketCanRead())
382 return false;
383 Lock2 lock(*this,m_socketMutex);
384 if (!socketCanRead() || state() == Destroy || state() == Idle || state() == Connecting)
385 return false;
386 socketSetReading(true);
387 if (state() != WaitTlsRsp)
388 len--;
389 else
390 len = 1;
391 lock.drop();
392 // Check stream state
393 XMPPError::Type error = XMPPError::NoError;
394 int read = m_socket->readData(buf,len);
395 Lock lck(m_socketMutex);
396 // Check if the connection is waiting to be reset
397 if (socketWaitReset()) {
398 socketSetReading(false);
399 return false;
400 }
401 // Check if something changed
402 if (!(m_socket && socketReading())) {
403 Debug(this,DebugAll,"Socket deleted while reading [%p]",this);
404 return false;
405 }
406 if (read && read != Socket::socketError()) {
407 if (!flag(StreamCompressed)) {
408 buf[read] = 0;
409 #ifdef JBSTREAM_DEBUG_SOCKET
410 Debug(this,DebugInfo,"Received %s [%p]",buf,this);
411 #endif
412 if (!m_xmlDom->parse(buf)) {
413 if (m_xmlDom->error() != XmlSaxParser::Incomplete)
414 error = XMPPError::Xml;
415 else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
416 error = XMPPError::Policy;
417 }
418 }
419 else if (m_compress) {
420 #ifdef JBSTREAM_DEBUG_SOCKET
421 Debug(this,DebugInfo,"Received %d compressed bytes [%p]",read,this);
422 #endif
423 DataBlock d;
424 int res = m_compress->decompress(buf,read,d);
425 if (res == read) {
426 #ifdef JBSTREAM_DEBUG_COMPRESS
427 Debug(this,DebugInfo,"Decompressed %d --> %u [%p]",read,d.length(),this);
428 #endif
429 if (d.length()) {
430 char c = 0;
431 d.append(&c,1);
432 buf = (char*)d.data();
433 #ifdef JBSTREAM_DEBUG_SOCKET
434 Debug(this,DebugInfo,"Received compressed %s [%p]",buf,this);
435 #endif
436 if (!m_xmlDom->parse(buf)) {
437 if (m_xmlDom->error() != XmlSaxParser::Incomplete)
438 error = XMPPError::Xml;
439 else if (m_xmlDom->buffer().length() > m_engine->m_maxIncompleteXml)
440 error = XMPPError::Policy;
441 }
442 }
443 }
444 else
445 error = XMPPError::UndefinedCondition;
446 }
447 else
448 error = XMPPError::Internal;
449 }
450 socketSetReading(false);
451 if (read) {
452 if (read == Socket::socketError()) {
453 if (m_socket->canRetry()) {
454 read = 0;
455 #ifdef XDEBUG
456 String tmp;
457 Thread::errorString(tmp,m_socket->error());
458 Debug(this,DebugAll,"Socket temporary unavailable for read. %d: '%s' [%p]",
459 m_socket->error(),tmp.c_str(),this);
460 #endif
461 }
462 else
463 error = XMPPError::SocketError;
464 }
465 }
466 else
467 error = XMPPError::SocketError;
468 if (error == XMPPError::NoError) {
469 // Stop reading if waiting for TLS start and received a complete element
470 // We'll wait for the stream processor to handle the received element
471 if (read && state() == WaitTlsRsp && !m_xmlDom->buffer().length() &&
472 m_xmlDom->unparsed() == XmlSaxParser::None) {
473 XmlDocument* doc = m_xmlDom->document();
474 // If received a complete element, the parser's current element is
475 // the document's root
476 if (doc && m_xmlDom->isCurrent(doc->root())) {
477 DDebug(this,DebugAll,"Received complete element in state=%s. Stop reading [%p]",
478 stateName(),this);
479 socketSetCanRead(false);
480 }
481 }
482 return read > 0;
483 }
484 // Error
485 int location = 0;
486 String reason;
487 if (error != XMPPError::SocketError) {
488 if (error == XMPPError::Xml) {
489 reason << "Parser error '" << m_xmlDom->getError() << "'";
490 Debug(this,DebugNote,"%s buffer='%s' [%p]",
491 reason.c_str(),m_xmlDom->buffer().c_str(),this);
492 }
493 else if (error == XMPPError::UndefinedCondition) {
494 reason = "Decompression failure";
495 Debug(this,DebugNote,"Decompressor failure [%p]",this);
496 }
497 else if (error == XMPPError::Internal) {
498 reason = "Decompression failure";
499 Debug(this,DebugNote,"No decompressor [%p]",this);
500 }
501 else {
502 reason = "Parser error 'XML element too long'";
503 Debug(this,DebugNote,"Parser overflow len=%u max= %u [%p]",
504 m_xmlDom->buffer().length(),m_engine->m_maxIncompleteXml,this);
505 }
506 }
507 else if (read) {
508 String tmp;
509 Thread::errorString(tmp,m_socket->error());
510 reason << "Socket read error: " << tmp << " (" << m_socket->error() << ")";
511 Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
512 }
513 else {
514 reason = "Stream EOF";
515 Debug(this,DebugInfo,"%s [%p]",reason.c_str(),this);
516 location = 1;
517 }
518 socketSetCanRead(false);
519 lck.drop();
520 postponeTerminate(location,m_incoming,error,reason);
521 return read > 0;
522 }
523
524 // Stream state processor
getEvent(u_int64_t time)525 JBEvent* JBStream::getEvent(u_int64_t time)
526 {
527 if (m_lastEvent)
528 return 0;
529 Lock lock(this);
530 if (m_lastEvent)
531 return 0;
532 XDebug(this,DebugAll,"JBStream::getEvent() [%p]",this);
533 checkPendingEvent();
534 if (!m_lastEvent) {
535 if (canProcess(time)) {
536 process(time);
537 checkPendingEvent();
538 if (!m_lastEvent)
539 checkTimeouts(time);
540 }
541 else
542 checkPendingEvent();
543 }
544 #ifdef XDEBUG
545 if (m_lastEvent)
546 Debug(this,DebugAll,"Generating event (%p,%s) in state '%s' [%p]",
547 m_lastEvent,m_lastEvent->name(),stateName(),this);
548 #endif
549 return m_lastEvent;
550 }
551
552 // Send a stanza ('iq', 'message' or 'presence') or dialback elements in Running state.
sendStanza(XmlElement * & xml)553 bool JBStream::sendStanza(XmlElement*& xml)
554 {
555 if (!xml)
556 return false;
557 DDebug(this,DebugAll,"sendStanza(%p) '%s' [%p]",xml,xml->tag(),this);
558 if (!(XMPPUtils::isStanza(*xml) ||
559 (m_type == s2s && XMPPUtils::hasXmlns(*xml,XMPPNamespace::Dialback)))) {
560 Debug(this,DebugNote,"Request to send non stanza xml='%s' [%p]",xml->tag(),this);
561 TelEngine::destruct(xml);
562 return false;
563 }
564 XmlElementOut* xo = new XmlElementOut(xml);
565 xml = 0;
566 xo->prepareToSend();
567 Lock lock(this);
568 m_pending.append(xo);
569 sendPending();
570 return true;
571 }
572
573 // Send stream related XML when negotiating the stream
574 // or some other stanza in non Running state
sendStreamXml(State newState,XmlElement * first,XmlElement * second,XmlElement * third)575 bool JBStream::sendStreamXml(State newState, XmlElement* first, XmlElement* second,
576 XmlElement* third)
577 {
578 DDebug(this,DebugAll,"sendStreamXml(%s,%p,%p,%p) [%p]",
579 stateName(),first,second,third,this);
580 Lock lock(this);
581 bool ok = false;
582 XmlFragment frag;
583 // Use a do while() to break to the end: safe cleanup
584 do {
585 if (m_state == Idle || m_state == Destroy)
586 break;
587 // Check if we have unsent stream xml
588 if (m_outStreamXml)
589 sendPending(true);
590 if (m_outStreamXml)
591 break;
592 if (!first)
593 break;
594 // Add stream declaration before stream start
595 if (first->getTag() == XMPPUtils::s_tag[XmlTag::Stream] &&
596 first->tag()[0] != '/') {
597 XmlDeclaration* decl = new XmlDeclaration;
598 decl->toString(m_outStreamXml,true);
599 frag.addChild(decl);
600 }
601 first->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
602 frag.addChild(first);
603 if (second) {
604 second->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
605 frag.addChild(second);
606 if (third) {
607 third->toString(m_outStreamXml,true,String::empty(),String::empty(),false);
608 frag.addChild(third);
609 }
610 }
611 first = second = third = 0;
612 if (flag(StreamCompressed) && !compress()) {
613 ok = false;
614 break;
615 }
616 m_engine->printXml(this,true,frag);
617 ok = sendPending(true);
618 } while (false);
619 TelEngine::destruct(first);
620 TelEngine::destruct(second);
621 TelEngine::destruct(third);
622 if (ok)
623 changeState(newState);
624 return ok;
625 }
626
627 // Start the stream. This method should be called by the upper layer
628 // when processing an incoming stream Start event
start(XMPPFeatureList * features,XmlElement * caps,bool useVer1)629 void JBStream::start(XMPPFeatureList* features, XmlElement* caps, bool useVer1)
630 {
631 Lock lock(this);
632 if (m_state != Starting)
633 return;
634 if (outgoing()) {
635 TelEngine::destruct(features);
636 TelEngine::destruct(caps);
637 if (m_type == c2s) {
638 // c2s: just wait for stream features
639 changeState(Features);
640 }
641 else if (m_type == s2s) {
642 // Wait features ?
643 if (flag(StreamRemoteVer1)) {
644 changeState(Features);
645 return;
646 }
647 // Stream not secured
648 if (!flag(StreamSecured)) {
649 // Accept dialback auth stream
650 // The namspace presence was already checked in checkStreamStart()
651 if (flag(TlsRequired)) {
652 terminate(0,false,0,XMPPError::EncryptionRequired);
653 return;
654 }
655 }
656 setFlags(StreamSecured);
657 serverStream()->sendDialback();
658 }
659 else if (m_type == cluster)
660 changeState(Features);
661 else if (m_type == comp)
662 serverStream()->startComp();
663 else
664 DDebug(this,DebugStub,"JBStream::start() not handled for type=%s",typeName());
665 return;
666 }
667 m_features.clear();
668 if (features)
669 m_features.add(*features);
670 if (useVer1 && flag(StreamRemoteVer1))
671 setFlags(StreamLocalVer1);
672 if (flag(StreamRemoteVer1) && flag(StreamLocalVer1)) {
673 // Set secured flag if we don't advertise TLS
674 if (!(flag(StreamSecured) || m_features.get(XMPPNamespace::Tls)))
675 setSecured();
676 // Set authenticated flag if we don't advertise authentication mechanisms
677 if (flag(StreamSecured)) {
678 if (flag(StreamAuthenticated))
679 m_features.remove(XMPPNamespace::Sasl);
680 else if (!m_features.get(XMPPNamespace::Sasl))
681 setFlags(StreamAuthenticated);
682 }
683 }
684 else
685 // c2s using non-sasl auth or s2s not using TLS
686 setSecured();
687 // Send start and features
688 XmlElement* s = buildStreamStart();
689 XmlElement* f = 0;
690 if (flag(StreamRemoteVer1) && flag(StreamLocalVer1))
691 f = m_features.buildStreamFeatures();
692 if (f && caps)
693 f->addChild(caps);
694 else
695 TelEngine::destruct(caps);
696 State newState = Features;
697 if (m_type == c2s) {
698 // Change stream state to Running if authenticated and there is no required
699 // feature to negotiate
700 if (flag(StreamAuthenticated) && !firstRequiredFeature())
701 newState = Running;
702 }
703 else if (m_type == s2s) {
704 // Change stream state to Running if authenticated and features list is empty
705 if (flag(StreamAuthenticated) && !m_features.skipNull())
706 newState = Running;
707 }
708 else if (m_type == cluster) {
709 // Change stream state to Running if authenticated and features list is empty
710 if (flag(StreamAuthenticated) && !m_features.skipNull())
711 newState = Running;
712 }
713 sendStreamXml(newState,s,f);
714 }
715
716 // Authenticate an incoming stream
authenticated(bool ok,const String & rsp,XMPPError::Type error,const char * username,const char * id,const char * resource)717 bool JBStream::authenticated(bool ok, const String& rsp, XMPPError::Type error,
718 const char* username, const char* id, const char* resource)
719 {
720 Lock lock(this);
721 if (m_state != Auth || !incoming())
722 return false;
723 DDebug(this,DebugAll,"authenticated(%s,'%s',%s) local=%s [%p]",
724 String::boolText(ok),rsp.safe(),XMPPUtils::s_error[error].c_str(),
725 m_local.c_str(),this);
726 if (ok) {
727 if (m_type == c2s) {
728 if (m_sasl) {
729 // Set remote party node if provided
730 if (!TelEngine::null(username)) {
731 m_remote.set(username,m_local.domain(),"");
732 Debug(this,DebugAll,"Remote party set to '%s' [%p]",m_remote.c_str(),this);
733 }
734 String text;
735 m_sasl->buildAuthRspReply(text,rsp);
736 XmlElement* s = XMPPUtils::createElement(XmlTag::Success,
737 XMPPNamespace::Sasl,text);
738 ok = sendStreamXml(WaitStart,s);
739 }
740 else if (m_features.get(XMPPNamespace::IqAuth)) {
741 // Set remote party if not provided
742 if (!TelEngine::null(username))
743 m_remote.set(username,m_local.domain(),resource);
744 else
745 m_remote.resource(resource);
746 if (m_remote.isFull()) {
747 Debug(this,DebugAll,"Remote party set to '%s' [%p]",m_remote.c_str(),this);
748 XmlElement* rsp = XMPPUtils::createIqResult(0,0,id,
749 XMPPUtils::createElement(XmlTag::Query,XMPPNamespace::IqAuth));
750 ok = sendStreamXml(Running,rsp);
751 if (!ok)
752 m_remote.set(m_local.domain());
753 }
754 else
755 terminate(0,true,0,XMPPError::Internal);
756 }
757 else
758 terminate(0,true,0,XMPPError::Internal);
759 }
760 else if (m_type == s2s)
761 ok = false;
762 else if (m_type == comp) {
763 XmlElement* rsp = XMPPUtils::createElement(XmlTag::Handshake);
764 ok = sendStreamXml(Running,rsp);
765 }
766 if (ok) {
767 m_features.remove(XMPPNamespace::Sasl);
768 m_features.remove(XMPPNamespace::IqAuth);
769 setFlags(StreamAuthenticated);
770 }
771 }
772 else {
773 if (m_type == c2s) {
774 XmlElement* rsp = 0;
775 if (m_sasl)
776 rsp = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
777 else {
778 rsp = XMPPUtils::createIq(XMPPUtils::IqError,0,0,id);
779 if (TelEngine::null(id))
780 rsp->addChild(XMPPUtils::createElement(XmlTag::Query,XMPPNamespace::IqAuth));
781 rsp->addChild(XMPPUtils::createError(XMPPError::TypeAuth,error));
782 }
783 ok = sendStreamXml(Features,rsp);
784 }
785 else if (m_type == s2s)
786 ok = false;
787 else if (m_type == comp)
788 terminate(0,true,0,XMPPError::NotAuthorized);
789 }
790 TelEngine::destruct(m_sasl);
791 return ok;
792 }
793
794 // Terminate the stream. Send stream end tag or error.
795 // Reset the stream. Deref stream if destroying.
terminate(int location,bool destroy,XmlElement * xml,int error,const char * reason,bool final,bool genEvent,const char * content)796 void JBStream::terminate(int location, bool destroy, XmlElement* xml, int error,
797 const char* reason, bool final, bool genEvent, const char* content)
798 {
799 XDebug(this,DebugAll,"terminate(%d,%u,%p,%u,%s,%u) state=%s [%p]",
800 location,destroy,xml,error,reason,final,stateName(),this);
801 Lock lock(this);
802 m_pending.clear();
803 m_outXmlCompress.clear();
804 resetPostponedTerminate();
805 // Already in destroy
806 if (state() == Destroy) {
807 TelEngine::destruct(xml);
808 return;
809 }
810 bool sendEndTag = true;
811 destroy = destroy || final || flag(NoAutoRestart);
812 // Set error flag
813 if (state() == Running) {
814 if (error != XMPPError::NoError)
815 setFlags(InError);
816 else
817 resetFlags(InError);
818 }
819 else
820 setFlags(InError);
821 if (flag(InError)) {
822 // Reset re-connect counter if not internal policy error
823 if (location || error != XMPPError::Policy)
824 m_restart = 0;
825 }
826 if (error == XMPPError::NoError && m_engine->exiting())
827 error = XMPPError::Shutdown;
828 // Last check for sendEndTag
829 if (sendEndTag) {
830 // Prohibitted states or socket read/write error
831 if (m_state == Destroy || m_state == Securing || m_state == Connecting)
832 sendEndTag = false;
833 else if (error == XMPPError::SocketError) {
834 sendEndTag = false;
835 reason = "I/O error";
836 }
837 }
838 Debug(this,DebugAll,
839 "Terminate by '%s' state=%s destroy=%u error=%s reason='%s' final=%u [%p]",
840 lookup(location,s_location),stateName(),destroy,
841 XMPPUtils::s_error[error].c_str(),reason,final,this);
842 if (sendEndTag) {
843 XmlElement* start = 0;
844 if (m_state == Starting && incoming())
845 start = buildStreamStart();
846 XmlElement* end = new XmlElement(String("/stream:stream"),false);
847 if (error != XMPPError::NoError && location < 1) {
848 XmlElement* e = XMPPUtils::createStreamError(error,reason,content);
849 if (!start)
850 sendStreamXml(m_state,e,end);
851 else
852 sendStreamXml(m_state,start,e,end);
853 }
854 else {
855 if (!start)
856 sendStreamXml(m_state,end);
857 else
858 sendStreamXml(m_state,start,end);
859 }
860 }
861 resetConnection();
862 m_outStreamXml.clear();
863 m_outStreamXmlCompress.clear();
864
865 // Always set termination event, except when called from destructor
866 if (genEvent && !(final || m_terminateEvent)) {
867 // TODO: Cancel all outgoing elements without id
868 m_terminateEvent = new JBEvent(destroy ? JBEvent::Destroy : JBEvent::Terminated,
869 this,xml);
870 xml = 0;
871 if (!m_terminateEvent->m_text) {
872 if (TelEngine::null(reason))
873 m_terminateEvent->m_text = XMPPUtils::s_error[error];
874 else
875 m_terminateEvent->m_text = reason;
876 }
877 }
878 TelEngine::destruct(xml);
879
880 changeState(destroy ? Destroy : Idle);
881 }
882
883 // Close the stream. Release memory
destroyed()884 void JBStream::destroyed()
885 {
886 terminate(0,true,0,XMPPError::NoError,"",true);
887 resetConnection();
888 if (m_engine)
889 m_engine->removeStream(this,false);
890 TelEngine::destruct(m_terminateEvent);
891 DDebug(this,DebugAll,"Destroyed local=%s remote=%s [%p]",
892 m_local.safe(),m_remote.safe(),this);
893 RefObject::destroyed();
894 }
895
896 // Check if stream state processor can continue
897 // This method is called from getEvent() with the stream locked
canProcess(u_int64_t time)898 bool JBStream::canProcess(u_int64_t time)
899 {
900 if (outgoing()) {
901 // Increase stream restart counter if it's time to and should auto restart
902 if (!flag(NoAutoRestart) && m_timeToFillRestart < time) {
903 m_timeToFillRestart = time + m_engine->m_restartUpdInterval;
904 if (m_restart < m_engine->m_restartMax) {
905 m_restart++;
906 DDebug(this,DebugAll,"Restart count set to %u max=%u [%p]",
907 m_restart,m_engine->m_restartMax,this);
908 }
909 }
910 if (state() == Idle) {
911 // Re-connect
912 bool conn = (m_connectStatus > JBConnect::Start);
913 if (!conn && m_restart) {
914 // Don't connect non client/component or cluster if we are in error and
915 // have nothing to send
916 if (m_type != c2s && m_type != comp && m_type != cluster &&
917 flag(InError) && !m_pending.skipNull())
918 return false;
919 conn = true;
920 m_restart--;
921 }
922 if (conn) {
923 resetFlags(InError);
924 changeState(Connecting);
925 m_engine->connectStream(this);
926 return false;
927 }
928 // Destroy if not auto-restarting
929 if (flag(NoAutoRestart)) {
930 terminate(0,true,0);
931 return false;
932 }
933 }
934 }
935 else if (state() == Idle && flag(NoAutoRestart)) {
936 terminate(0,true,0);
937 return false;
938 }
939 return true;
940 }
941
942 // Process stream state. Get XML from parser's queue and process it
943 // This method is called from getEvent() with the stream locked
process(u_int64_t time)944 void JBStream::process(u_int64_t time)
945 {
946 if (!m_xmlDom)
947 return;
948 XDebug(this,DebugAll,"JBStream::process() [%p]",this);
949 while (true) {
950 sendPending();
951 if (m_terminateEvent)
952 break;
953
954 // Lock the parser to obtain the root and/or child
955 // Unlock it before processing received element
956 Lock lockDoc(m_socketMutex);
957 XmlDocument* doc = m_xmlDom ? m_xmlDom->document() : 0;
958 XmlElement* root = doc ? doc->root(false) : 0;
959 if (!root)
960 break;
961
962 if (m_state == WaitStart) {
963 // Print the declaration
964 XmlDeclaration* dec = doc->declaration();
965 if (dec)
966 m_engine->printXml(this,false,*dec);
967 XmlElement xml(*root);
968 lockDoc.drop();
969 // Print the root. Make sure we don't print its children
970 xml.clearChildren();
971 m_engine->printXml(this,false,xml);
972 // Check if valid
973 if (!XMPPUtils::isTag(xml,XmlTag::Stream,XMPPNamespace::Stream)) {
974 String* ns = xml.xmlns();
975 Debug(this,DebugMild,"Received invalid stream root '%s' namespace='%s' [%p]",
976 xml.tag(),TelEngine::c_safe(ns),this);
977 terminate(0,true,0);
978 break;
979 }
980 // Check 'from' and 'to'
981 JabberID from;
982 JabberID to;
983 if (!getJids(&xml,from,to))
984 break;
985 DDebug(this,DebugAll,"Processing root '%s' in state %s [%p]",
986 xml.tag(),stateName(),this);
987 processStart(&xml,from,to);
988 break;
989 }
990
991 XmlElement* xml = root->pop();
992 if (!xml) {
993 if (root->completed())
994 socketSetCanRead(false);
995 if (m_events.skipNull())
996 break;
997 if (!root->completed()) {
998 if (m_ppTerminate && !(m_pending.skipNull() && socketCanWrite())) {
999 lockDoc.drop();
1000 postponedTerminate();
1001 }
1002 break;
1003 }
1004 DDebug(this,DebugAll,"Remote closed the stream in state %s [%p]",
1005 stateName(),this);
1006 lockDoc.drop();
1007 resetPostponedTerminate();
1008 terminate(1,false,0);
1009 break;
1010 }
1011 lockDoc.drop();
1012
1013 // Process received element
1014 // Print it
1015 m_engine->printXml(this,false,*xml);
1016 // Check stream termination
1017 if (streamError(xml))
1018 break;
1019 // Check 'from' and 'to'
1020 JabberID from;
1021 JabberID to;
1022 if (!getJids(xml,from,to))
1023 break;
1024 // Restart the idle timer
1025 setIdleTimer(time);
1026 // Check if a received stanza is valid and allowed in current state
1027 if (!checkStanzaRecv(xml,from,to))
1028 break;
1029
1030 DDebug(this,DebugAll,"Processing (%p,%s) in state %s [%p]",
1031 xml,xml->tag(),stateName(),this);
1032
1033 // Process here dialback verify
1034 if (m_type == s2s && isDbVerify(*xml)) {
1035 switch (state()) {
1036 case Running:
1037 case Features:
1038 case Starting:
1039 case Challenge:
1040 case Auth:
1041 m_events.append(new JBEvent(JBEvent::DbVerify,this,xml,from,to));
1042 break;
1043 default:
1044 dropXml(xml,"dialback verify in unsupported state");
1045 }
1046 continue;
1047 }
1048
1049 switch (m_state) {
1050 case Running:
1051 processRunning(xml,from,to);
1052 // Reset ping
1053 setNextPing(true);
1054 m_pingId = "";
1055 break;
1056 case Features:
1057 if (m_incoming)
1058 processFeaturesIn(xml,from,to);
1059 else
1060 processFeaturesOut(xml,from,to);
1061 break;
1062 case WaitStart:
1063 case Starting:
1064 processStart(xml,from,to);
1065 TelEngine::destruct(xml);
1066 break;
1067 case Challenge:
1068 processChallenge(xml,from,to);
1069 break;
1070 case Auth:
1071 processAuth(xml,from,to);
1072 break;
1073 case WaitTlsRsp:
1074 processWaitTlsRsp(xml,from,to);
1075 break;
1076 case Register:
1077 processRegister(xml,from,to);
1078 break;
1079 case Compressing:
1080 processCompressing(xml,from,to);
1081 break;
1082 default:
1083 dropXml(xml,"unhandled stream state in process()");
1084 }
1085 break;
1086 }
1087 XDebug(this,DebugAll,"JBStream::process() exiting [%p]",this);
1088 }
1089
1090 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)1091 bool JBStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
1092 {
1093 if (!xml)
1094 return true;
1095 int t, ns;
1096 if (!XMPPUtils::getTag(*xml,t,ns))
1097 return dropXml(xml,"failed to retrieve element tag");
1098 switch (t) {
1099 case XmlTag::Message:
1100 if (ns != m_xmlns)
1101 break;
1102 m_events.append(new JBEvent(JBEvent::Message,this,xml,from,to));
1103 return true;
1104 case XmlTag::Presence:
1105 if (ns != m_xmlns)
1106 break;
1107 m_events.append(new JBEvent(JBEvent::Presence,this,xml,from,to));
1108 return true;
1109 case XmlTag::Iq:
1110 if (ns != m_xmlns)
1111 break;
1112 checkPing(this,xml,m_pingId);
1113 m_events.append(new JBEvent(JBEvent::Iq,this,xml,from,to,xml->findFirstChild()));
1114 return true;
1115 default:
1116 m_events.append(new JBEvent(JBEvent::Unknown,this,xml,from,to));
1117 return true;
1118 }
1119 // Invalid stanza namespace
1120 XmlElement* rsp = XMPPUtils::createError(xml,XMPPError::TypeModify,
1121 XMPPError::InvalidNamespace,"Only stanzas in default namespace are allowed");
1122 sendStanza(rsp);
1123 return true;
1124 }
1125
1126 // Check stream timeouts
1127 // This method is called from getEvent() with the stream locked, after
checkTimeouts(u_int64_t time)1128 void JBStream::checkTimeouts(u_int64_t time)
1129 {
1130 if (m_ppTerminateTimeout && m_ppTerminateTimeout <= time) {
1131 m_ppTerminateTimeout = 0;
1132 Debug(this,DebugAll,"Postponed termination timed out [%p]",this);
1133 if (postponedTerminate())
1134 return;
1135 }
1136 // Running: check ping and idle timers
1137 if (m_state == Running) {
1138 const char* reason = 0;
1139 if (m_pingTimeout) {
1140 if (m_pingTimeout < time) {
1141 Debug(this,DebugNote,"Ping stanza with id '%s' timed out [%p]",m_pingId.c_str(),this);
1142 reason = "Ping timeout";
1143 }
1144 }
1145 else if (m_nextPing && time >= m_nextPing) {
1146 XmlElement* ping = setNextPing(false);
1147 if (ping) {
1148 DDebug(this,DebugAll,"Sending ping with id=%s [%p]",m_pingId.c_str(),this);
1149 if (!sendStanza(ping))
1150 m_pingId = "";
1151 }
1152 else {
1153 resetPing();
1154 m_pingId = "";
1155 }
1156 }
1157 if (m_idleTimeout && m_idleTimeout < time && !reason)
1158 reason = "Stream idle";
1159 if (reason)
1160 terminate(0,m_incoming,0,XMPPError::ConnTimeout,reason);
1161 return;
1162 }
1163 // Stream setup timer
1164 if (m_setupTimeout && m_setupTimeout < time) {
1165 terminate(0,m_incoming,0,XMPPError::Policy,"Stream setup timeout");
1166 return;
1167 }
1168 // Stream start timer
1169 if (m_startTimeout && m_startTimeout < time) {
1170 terminate(0,m_incoming,0,XMPPError::Policy,"Stream start timeout");
1171 return;
1172 }
1173 // Stream connect timer
1174 if (m_connectTimeout && m_connectTimeout < time) {
1175 DDebug(this,DebugNote,"Connect timed out stat=%s [%p]",
1176 lookup(m_connectStatus,JBConnect::s_statusName),this);
1177 // Don't terminate if there are more connect options
1178 if (state() == Connecting && m_connectStatus > JBConnect::Start) {
1179 m_engine->stopConnect(toString());
1180 m_engine->connectStream(this);
1181 }
1182 else
1183 terminate(0,m_incoming,0,XMPPError::ConnTimeout,"Stream connect timeout");
1184 return;
1185 }
1186 }
1187
1188 // Reset the stream's connection. Build a new XML parser if the socket is valid
resetConnection(Socket * sock)1189 void JBStream::resetConnection(Socket* sock)
1190 {
1191 DDebug(this,DebugAll,"JBStream::resetConnection(%p) current=%p [%p]",
1192 sock,m_socket,this);
1193 // Release the old one
1194 if (m_socket) {
1195 m_socketMutex.lock();
1196 m_socketFlags |= SocketWaitReset;
1197 m_socketMutex.unlock();
1198 // Wait for the socket to become available (not reading or writing)
1199 Socket* tmp = 0;
1200 while (true) {
1201 Lock lock(m_socketMutex);
1202 if (!(m_socket && (socketReading() || socketWriting()))) {
1203 tmp = m_socket;
1204 m_socket = 0;
1205 m_socketFlags = 0;
1206 if (m_xmlDom) {
1207 delete m_xmlDom;
1208 m_xmlDom = 0;
1209 }
1210 TelEngine::destruct(m_compress);
1211 break;
1212 }
1213 lock.drop();
1214 Thread::yield(false);
1215 }
1216 if (tmp) {
1217 tmp->setLinger(-1);
1218 tmp->terminate();
1219 delete tmp;
1220 }
1221 }
1222 resetPostponedTerminate();
1223 if (sock) {
1224 Lock lock(m_socketMutex);
1225 if (m_socket) {
1226 Debug(this,DebugWarn,"Duplicate attempt to set socket! [%p]",this);
1227 delete sock;
1228 return;
1229 }
1230 m_xmlDom = new XmlDomParser(debugName());
1231 m_xmlDom->debugChain(this);
1232 m_socket = sock;
1233 if (debugAt(DebugAll)) {
1234 SocketAddr l, r;
1235 localAddr(l);
1236 remoteAddr(r);
1237 Debug(this,DebugAll,"Connection set local=%s:%d remote=%s:%d sock=%p [%p]",
1238 l.host().c_str(),l.port(),r.host().c_str(),r.port(),m_socket,this);
1239 }
1240 m_socket->setReuse(true);
1241 m_socket->setBlocking(false);
1242 socketSetCanRead(true);
1243 socketSetCanWrite(true);
1244 }
1245 }
1246
1247 // Build a ping iq stanza
buildPing(const String & stanzaId)1248 XmlElement* JBStream::buildPing(const String& stanzaId)
1249 {
1250 return 0;
1251 }
1252
1253 // Build a stream start XML element
buildStreamStart()1254 XmlElement* JBStream::buildStreamStart()
1255 {
1256 XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
1257 if (incoming())
1258 start->setAttribute("id",m_id);
1259 XMPPUtils::setStreamXmlns(*start);
1260 start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
1261 start->setAttributeValid("from",m_local.bare());
1262 start->setAttributeValid("to",m_remote.bare());
1263 if (outgoing() || flag(StreamRemoteVer1))
1264 start->setAttribute("version","1.0");
1265 start->setAttribute("xml:lang","en");
1266 return start;
1267 }
1268
1269 // Process received elements in WaitStart state
1270 // WaitStart: Incoming: waiting for stream start
1271 // Outgoing: idem (our stream start was already sent)
1272 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)1273 bool JBStream::processStart(const XmlElement* xml, const JabberID& from,
1274 const JabberID& to)
1275 {
1276 Debug(this,DebugStub,"JBStream::processStart(%s) [%p]",xml->tag(),this);
1277 return true;
1278 }
1279
1280 // Process elements in Compressing state
1281 // Return false if stream termination was initiated
processCompressing(XmlElement * xml,const JabberID & from,const JabberID & to)1282 bool JBStream::processCompressing(XmlElement* xml, const JabberID& from,
1283 const JabberID& to)
1284 {
1285 XDebug(this,DebugAll,"JBStream::processCompressing() [%p]",this);
1286 int t = XmlTag::Count;
1287 int n = XMPPNamespace::Count;
1288 XMPPUtils::getTag(*xml,t,n);
1289 if (outgoing()) {
1290 if (n != XMPPNamespace::Compress)
1291 return dropXml(xml,"expecting compression namespace");
1292 // Expecting: compressed/failure
1293 bool ok = (t == XmlTag::Compressed);
1294 if (!ok && t != XmlTag::Failure)
1295 return dropXml(xml,"expecting compress response (compressed/failure)");
1296 if (ok) {
1297 if (m_compress)
1298 setFlags(StreamCompressed);
1299 else
1300 return destroyDropXml(xml,XMPPError::Internal,"no compressor");
1301 }
1302 else {
1303 XmlElement* ch = xml->findFirstChild();
1304 Debug(this,DebugInfo,"Compress failed at remote party error=%s [%p]",
1305 ch ? ch->tag() : "",this);
1306 TelEngine::destruct(m_compress);
1307 }
1308 TelEngine::destruct(xml);
1309 // Restart the stream on success
1310 if (ok) {
1311 XmlElement* s = buildStreamStart();
1312 return sendStreamXml(WaitStart,s);
1313 }
1314 // Compress failed: continue
1315 JBServerStream* server = serverStream();
1316 if (server)
1317 return server->sendDialback();
1318 JBClientStream* client = clientStream();
1319 if (client)
1320 return client->bind();
1321 Debug(this,DebugNote,"Unhandled stream type in %s state [%p]",stateName(),this);
1322 terminate(0,true,0,XMPPError::Internal);
1323 return true;
1324 }
1325 // Authenticated incoming s2s waiting for compression or any other element
1326 if (type() == s2s && m_features.get(XMPPNamespace::CompressFeature)) {
1327 if (t == XmlTag::Compress && n == XMPPNamespace::Compress)
1328 return handleCompressReq(xml);
1329 // Change state to Running
1330 changeState(Running);
1331 return processRunning(xml,from,to);
1332 }
1333
1334 return dropXml(xml,"not implemented");
1335 }
1336
1337 // Process elements in Register state
processRegister(XmlElement * xml,const JabberID & from,const JabberID & to)1338 bool JBStream::processRegister(XmlElement* xml, const JabberID& from,
1339 const JabberID& to)
1340 {
1341 dropXml(xml,"can't process in this state");
1342 terminate(0,true,0,XMPPError::Internal);
1343 return false;
1344 }
1345
1346 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)1347 bool JBStream::processAuth(XmlElement* xml, const JabberID& from,
1348 const JabberID& to)
1349 {
1350 return dropXml(xml,"can't process in this state");
1351 }
1352
1353 // Check if a received start start element's namespaces are correct.
processStreamStart(const XmlElement * xml)1354 bool JBStream::processStreamStart(const XmlElement* xml)
1355 {
1356 XDebug(this,DebugAll,"JBStream::processStreamStart() [%p]",this);
1357 if (m_state == Starting)
1358 return true;
1359 changeState(Starting);
1360 if (!XMPPUtils::hasDefaultXmlns(*xml,m_xmlns)) {
1361 Debug(this,DebugNote,"Received '%s' with invalid xmlns='%s' [%p]",
1362 xml->tag(),TelEngine::c_safe(xml->xmlns()),this);
1363 terminate(0,m_incoming,0,XMPPError::InvalidNamespace);
1364 return false;
1365 }
1366 XMPPError::Type error = XMPPError::NoError;
1367 const char* reason = 0;
1368 while (true) {
1369 if (m_type != c2s && m_type != s2s && m_type != comp && m_type != cluster) {
1370 Debug(this,DebugStub,"processStreamStart() type %u not handled!",m_type);
1371 error = XMPPError::Internal;
1372 break;
1373 }
1374 // Check xmlns:stream
1375 String* nsStr = xml->getAttribute("xmlns:stream");
1376 if (!nsStr || *nsStr != XMPPUtils::s_ns[XMPPNamespace::Stream]) {
1377 Debug(this,DebugNote,"Received '%s' with invalid xmlns:stream='%s' [%p]",
1378 xml->tag(),TelEngine::c_safe(nsStr),this);
1379 error = XMPPError::InvalidNamespace;
1380 break;
1381 }
1382 // Check version
1383 String ver(xml->getAttribute("version"));
1384 int remoteVersion = -1;
1385 if (ver) {
1386 int pos = ver.find('.');
1387 if (pos > 0)
1388 remoteVersion = ver.substr(0,pos).toInteger(-1);
1389 }
1390 if (remoteVersion == 1)
1391 setFlags(StreamRemoteVer1);
1392 else if (remoteVersion < 1) {
1393 if (m_type == c2s)
1394 XDebug(this,DebugAll,"c2s stream start with version < 1 [%p]",this);
1395 else if (m_type == s2s) {
1396 // Accept invalid/unsupported version only if TLS is not required
1397 if (!flag(TlsRequired)) {
1398 // Check dialback
1399 if (!xml->hasAttribute("xmlns:db",XMPPUtils::s_ns[XMPPNamespace::Dialback]))
1400 error = XMPPError::InvalidNamespace;
1401 }
1402 else
1403 error = XMPPError::EncryptionRequired;
1404 }
1405 else if (m_type != comp)
1406 error = XMPPError::Internal;
1407 }
1408 else if (remoteVersion > 1)
1409 error = XMPPError::UnsupportedVersion;
1410 if (error != XMPPError::NoError) {
1411 Debug(this,DebugNote,"Unacceptable '%s' version='%s' error=%s [%p]",
1412 xml->tag(),ver.c_str(),XMPPUtils::s_error[error].c_str(),this);
1413 break;
1414 }
1415 // Set stream id: generate one for incoming, get it from xml if outgoing
1416 if (incoming()) {
1417 // Generate a random, variable length stream id
1418 MD5 md5(String((int)(int64_t)this));
1419 md5 << m_name << String((int)Time::msecNow());
1420 m_id = md5.hexDigest();
1421 m_id << "_" << String((int)Random::random());
1422 }
1423 else {
1424 m_id = xml->getAttribute("id");
1425 if (!m_id) {
1426 Debug(this,DebugNote,"Received '%s' with empty stream id [%p]",
1427 xml->tag(),this);
1428 reason = "Missing stream id";
1429 error = XMPPError::InvalidId;
1430 break;
1431 }
1432 }
1433 XDebug(this,DebugAll,"Stream id set to '%s' [%p]",m_id.c_str(),this);
1434 break;
1435 }
1436 if (error == XMPPError::NoError)
1437 return true;
1438 terminate(0,m_incoming,0,error,reason);
1439 return false;
1440 }
1441
1442 // Handle an already checked (tag and namespace) compress request
1443 // Respond to it. Change stream state on success
handleCompressReq(XmlElement * xml)1444 bool JBStream::handleCompressReq(XmlElement* xml)
1445 {
1446 XMPPError::Type error = XMPPError::UnsupportedMethod;
1447 State newState = state();
1448 XmlElement* rsp = 0;
1449 XmlElement* m = XMPPUtils::findFirstChild(*xml,XmlTag::Method,
1450 XMPPNamespace::Compress);
1451 if (m) {
1452 // Get and check the method
1453 const String& method = m->getText();
1454 XMPPFeatureCompress* c = m_features.getCompress();
1455 if (method && c && c->hasMethod(method)) {
1456 // Build the (de)compressor
1457 Lock lock(m_socketMutex);
1458 m_engine->compressStream(this,method);
1459 if (m_compress) {
1460 newState = WaitStart;
1461 setFlags(SetCompressed);
1462 m_features.remove(XMPPNamespace::CompressFeature);
1463 rsp = XMPPUtils::createElement(XmlTag::Compressed,XMPPNamespace::Compress);
1464 }
1465 else
1466 error = XMPPError::SetupFailed;
1467 }
1468 }
1469 TelEngine::destruct(xml);
1470 if (!rsp)
1471 rsp = XMPPUtils::createFailure(XMPPNamespace::Compress,error);
1472 return sendStreamXml(newState,rsp);
1473 }
1474
1475 // Check if a received element is a stream error one
streamError(XmlElement * xml)1476 bool JBStream::streamError(XmlElement* xml)
1477 {
1478 if (!(xml && XMPPUtils::isTag(*xml,XmlTag::Error,XMPPNamespace::Stream)))
1479 return false;
1480 String text;
1481 String error;
1482 String content;
1483 XMPPUtils::decodeError(xml,XMPPNamespace::StreamError,&error,&text,&content);
1484 Debug(this,DebugAll,"Received stream error '%s' content='%s' text='%s' in state %s [%p]",
1485 error.c_str(),content.c_str(),text.c_str(),stateName(),this);
1486 int err = XMPPUtils::s_error[error];
1487 if (err >= XMPPError::Count)
1488 err = XMPPError::NoError;
1489 String rAddr;
1490 int rPort = 0;
1491 if (err == XMPPError::SeeOther && content) {
1492 if (m_redirectCount < m_redirectMax) {
1493 int pos = content.rfind(':');
1494 if (pos >= 0) {
1495 rAddr = content.substr(0,pos);
1496 if (rAddr) {
1497 rPort = content.substr(pos + 1).toInteger(0);
1498 if (rPort < 0)
1499 rPort = 0;
1500 }
1501 }
1502 else
1503 rAddr = content;
1504 if (rAddr) {
1505 // Check if the connect destination is different
1506 SocketAddr remoteIp;
1507 remoteAddr(remoteIp);
1508 bool sameDest = (rAddr == serverHost()) || (rAddr == m_connectAddr) || (rAddr == remoteIp.host());
1509 if (sameDest) {
1510 sameDest = ((rPort > 0 ? rPort : XMPP_C2S_PORT) == remoteIp.port());
1511 if (sameDest) {
1512 Debug(this,DebugNote,"Ignoring redirect to same destination [%p]",this);
1513 rAddr = "";
1514 }
1515 }
1516 }
1517 }
1518 }
1519 terminate(1,false,xml,err,text,false,rAddr.null());
1520 setRedirect(rAddr,rPort);
1521 if (rAddr) {
1522 resetFlags(InError);
1523 resetConnectStatus();
1524 changeState(Connecting);
1525 m_engine->connectStream(this);
1526 setRedirect();
1527 }
1528 return true;
1529 }
1530
1531 // Check if a received element has valid 'from' and 'to' jid attributes
getJids(XmlElement * xml,JabberID & from,JabberID & to)1532 bool JBStream::getJids(XmlElement* xml, JabberID& from, JabberID& to)
1533 {
1534 if (!xml)
1535 return true;
1536 from = xml->getAttribute("from");
1537 to = xml->getAttribute("to");
1538 XDebug(this,DebugAll,"Got jids xml='%s' from='%s' to='%s' [%p]",
1539 xml->tag(),from.c_str(),to.c_str(),this);
1540 if (to.valid() && from.valid())
1541 return true;
1542 Debug(this,DebugNote,"Received '%s' with bad from='%s' or to='%s' [%p]",
1543 xml->tag(),from.c_str(),to.c_str(),this);
1544 terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1545 return false;
1546 }
1547
1548 // Check if a received element is a presence, message or iq qualified by the stream
1549 // namespace and the stream is not authenticated
1550 // Validate 'from' for c2s streams
1551 // Validate s2s 'to' domain and 'from' jid
checkStanzaRecv(XmlElement * xml,JabberID & from,JabberID & to)1552 bool JBStream::checkStanzaRecv(XmlElement* xml, JabberID& from, JabberID& to)
1553 {
1554 if (!XMPPUtils::isStanza(*xml))
1555 return true;
1556
1557 // RFC 3920bis 5.2: Accept stanzas only if the stream was authenticated
1558 // Accept IQs in jabber:iq:register namespace
1559 // Accept IQs in jabber:iq:auth namespace
1560 // They might be received on a non authenticated stream)
1561 if (!flag(StreamAuthenticated)) {
1562 bool isIq = XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns);
1563 bool valid = isIq && XMPPUtils::findFirstChild(*xml,XmlTag::Count,
1564 XMPPNamespace::IqRegister);
1565 JBClientStream* c2s = clientStream();
1566 if (!valid && c2s) {
1567 // Outgoing client stream: check register responses
1568 // Incoming client stream: check auth stanzas
1569 if (outgoing())
1570 valid = c2s->isRegisterId(*xml);
1571 else
1572 valid = isIq && XMPPUtils::findFirstChild(*xml,XmlTag::Count,
1573 XMPPNamespace::IqAuth);
1574 }
1575 if (!valid) {
1576 terminate(0,false,xml,XMPPError::NotAuthorized,
1577 "Can't accept stanza on non authorized stream");
1578 return false;
1579 }
1580 }
1581
1582 switch (m_type) {
1583 case c2s:
1584 if (m_incoming) {
1585 // Check for valid from
1586 if (from && !m_remote.match(from)) {
1587 XmlElement* e = XMPPUtils::createError(xml,
1588 XMPPError::TypeModify,XMPPError::BadAddressing);
1589 sendStanza(e);
1590 return false;
1591 }
1592 // Make sure the upper layer always has the full jid
1593 if (!from)
1594 from = m_remote;
1595 else if (!from.resource())
1596 from.resource(m_remote.resource());
1597 }
1598 else {
1599 XDebug(this,DebugStub,
1600 "Possible checkStanzaRecv() unhandled outgoing c2s stream [%p]",this);
1601 }
1602 break;
1603 case comp:
1604 case s2s:
1605 // RFC 3920bis 9.1.1.2 and 9.1.2.1:
1606 // Validate 'to' and 'from'
1607 // Accept anything for component streams
1608 if (!(to && from)) {
1609 terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1610 return false;
1611 }
1612 // TODO: Find an outgoing stream and send stanza error to the remote server
1613 // instead of terminating the stream
1614 if (m_type == s2s) {
1615 if (incoming()) {
1616 // Accept stanzas only for validated domains
1617 if (!serverStream()->hasRemoteDomain(from.domain())) {
1618 terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1619 return false;
1620 }
1621 }
1622 else {
1623 // We should not receive any stanza on outgoing s2s
1624 terminate(0,m_incoming,xml,XMPPError::NotAuthorized);
1625 return false;
1626 }
1627 if (m_local != to.domain()) {
1628 terminate(0,m_incoming,xml,XMPPError::BadAddressing);
1629 return false;
1630 }
1631 }
1632 else if (from.domain() != m_remote.domain()) {
1633 terminate(0,m_incoming,xml,XMPPError::InvalidFrom);
1634 return false;
1635 }
1636 break;
1637 case cluster:
1638 break;
1639 default:
1640 Debug(this,DebugStub,"checkStanzaRecv() unhandled stream type=%s [%p]",
1641 typeName(),this);
1642 }
1643 return true;
1644 }
1645
1646 // Change stream state. Reset state depending data
changeState(State newState,u_int64_t time)1647 void JBStream::changeState(State newState, u_int64_t time)
1648 {
1649 if (newState == m_state)
1650 return;
1651 Debug(this,DebugAll,"Changing state from '%s' to '%s' [%p]",
1652 stateName(),lookup(newState,s_stateName),this);
1653 // Set/reset state depending data
1654 switch (m_state) {
1655 case Running:
1656 resetPing();
1657 m_pingId = "";
1658 break;
1659 case WaitStart:
1660 // Reset connect status if not timeout
1661 if (m_startTimeout && m_startTimeout > time)
1662 resetConnectStatus();
1663 m_startTimeout = 0;
1664 break;
1665 case Securing:
1666 setFlags(StreamSecured);
1667 socketSetCanRead(true);
1668 break;
1669 case Connecting:
1670 m_connectTimeout = 0;
1671 m_engine->stopConnect(toString());
1672 break;
1673 case Register:
1674 if (type() == c2s)
1675 clientStream()->m_registerReq = 0;
1676 break;
1677 default: ;
1678 }
1679 switch (newState) {
1680 case WaitStart:
1681 if (m_engine->m_setupTimeout && m_type != cluster)
1682 m_setupTimeout = time + timerMultiplier(this) * m_engine->m_setupTimeout;
1683 else
1684 m_setupTimeout = 0;
1685 m_startTimeout = time + timerMultiplier(this) * m_engine->m_startTimeout;
1686 DDebug(this,DebugAll,"Set timeouts start=" FMT64 " setup=" FMT64 " [%p]",
1687 m_startTimeout,m_setupTimeout,this);
1688 if (m_xmlDom) {
1689 Lock lck(m_socketMutex);
1690 if (m_xmlDom) {
1691 m_xmlDom->reset();
1692 DDebug(this,DebugAll,"XML parser reset [%p]",this);
1693 }
1694 }
1695 break;
1696 case Idle:
1697 m_events.clear();
1698 case Destroy:
1699 m_id = "";
1700 m_setupTimeout = 0;
1701 m_startTimeout = 0;
1702 // Reset all internal flags
1703 resetFlags(InternalFlags);
1704 if (type() == c2s)
1705 clientStream()->m_registerReq = 0;
1706 break;
1707 case Running:
1708 resetConnectStatus();
1709 setRedirect();
1710 m_redirectCount = 0;
1711 m_pingInterval = m_engine->m_pingInterval;
1712 setNextPing(true);
1713 setFlags(StreamSecured | StreamAuthenticated);
1714 resetFlags(InError);
1715 m_setupTimeout = 0;
1716 m_startTimeout = 0;
1717 if (m_state != Running)
1718 m_events.append(new JBEvent(JBEvent::Running,this,0));
1719 break;
1720 case Securing:
1721 socketSetCanRead(false);
1722 break;
1723 default: ;
1724 }
1725 m_state = newState;
1726 if (m_state == Running)
1727 setIdleTimer(time);
1728 }
1729
1730 // Check if the stream compress flag is set and compression was offered by remote party
checkCompress()1731 XmlElement* JBStream::checkCompress()
1732 {
1733 if (flag(StreamCompressed) || !flag(Compress))
1734 return 0;
1735 XMPPFeatureCompress* c = m_features.getCompress();
1736 if (!c)
1737 return 0;
1738 if (!(c && c->methods()))
1739 return 0;
1740 XmlElement* x = 0;
1741 Lock lock(m_socketMutex);
1742 m_engine->compressStream(this,c->methods());
1743 if (m_compress && m_compress->format()) {
1744 x = XMPPUtils::createElement(XmlTag::Compress,XMPPNamespace::Compress);
1745 x->addChild(XMPPUtils::createElement(XmlTag::Method,m_compress->format()));
1746 }
1747 else
1748 TelEngine::destruct(m_compress);
1749 return x;
1750 }
1751
1752 // Check for pending events. Set the last event
checkPendingEvent()1753 void JBStream::checkPendingEvent()
1754 {
1755 if (m_lastEvent)
1756 return;
1757 if (!m_terminateEvent) {
1758 GenObject* gen = m_events.remove(false);
1759 if (gen)
1760 m_lastEvent = static_cast<JBEvent*>(gen);
1761 return;
1762 }
1763 // Check for register events and raise them before the terminate event
1764 for (ObjList* o = m_events.skipNull(); o; o = o->skipNext()) {
1765 JBEvent* ev = static_cast<JBEvent*>(o->get());
1766 if (ev->type() == JBEvent::RegisterOk || ev->type() == JBEvent::RegisterFailed) {
1767 m_lastEvent = ev;
1768 m_events.remove(ev,false);
1769 return;
1770 }
1771 }
1772 m_lastEvent = m_terminateEvent;
1773 m_terminateEvent = 0;
1774 }
1775
1776 // Send pending stream XML or stanzas
sendPending(bool streamOnly)1777 bool JBStream::sendPending(bool streamOnly)
1778 {
1779 if (!m_socket)
1780 return false;
1781 XDebug(this,DebugAll,"JBStream::sendPending() [%p]",this);
1782 bool noComp = !flag(StreamCompressed);
1783 // Always try to send pending stream XML first
1784 if (m_outStreamXml) {
1785 const void* buf = 0;
1786 unsigned int len = 0;
1787 if (noComp) {
1788 buf = m_outStreamXml.c_str();
1789 len = m_outStreamXml.length();
1790 }
1791 else {
1792 buf = m_outStreamXmlCompress.data();
1793 len = m_outStreamXmlCompress.length();
1794 }
1795 if (!writeSocket(buf,len))
1796 return false;
1797 bool all = false;
1798 if (noComp) {
1799 all = (len == m_outStreamXml.length());
1800 if (all)
1801 m_outStreamXml.clear();
1802 else
1803 m_outStreamXml = m_outStreamXml.substr(len);
1804 }
1805 else {
1806 all = (len == m_outStreamXmlCompress.length());
1807 if (all) {
1808 m_outStreamXml.clear();
1809 m_outStreamXmlCompress.clear();
1810 }
1811 else
1812 m_outStreamXmlCompress.cut(-(int)len);
1813 }
1814 // Start TLS now for incoming streams
1815 if (m_incoming && m_state == Securing) {
1816 if (all) {
1817 m_engine->encryptStream(this);
1818 setFlags(StreamTls);
1819 socketSetCanRead(true);
1820 }
1821 return true;
1822 }
1823 // Check set StreamCompressed flag if all data sent
1824 if (all && flag(SetCompressed))
1825 setFlags(StreamCompressed);
1826 if (streamOnly || !all)
1827 return true;
1828 }
1829
1830 // Send pending stanzas
1831 if (m_state != Running || streamOnly)
1832 return true;
1833 ObjList* obj = m_pending.skipNull();
1834 if (!obj)
1835 return true;
1836 XmlElementOut* eout = static_cast<XmlElementOut*>(obj->get());
1837 XmlElement* xml = eout->element();
1838 if (!xml) {
1839 m_pending.remove(eout,true);
1840 return true;
1841 }
1842 bool sent = eout->sent();
1843 const void* buf = 0;
1844 unsigned int len = 0;
1845 if (noComp)
1846 buf = (const void*)eout->getData(len);
1847 else {
1848 if (!sent) {
1849 // Make sure the buffer is prepared for sending
1850 eout->getData(len);
1851 m_outXmlCompress.clear();
1852 if (!compress(eout))
1853 return false;
1854 }
1855 buf = m_outXmlCompress.data();
1856 len = m_outXmlCompress.length();
1857 }
1858 // Print the element only if it's the first time we try to send it
1859 if (!sent)
1860 m_engine->printXml(this,true,*xml);
1861 if (writeSocket(buf,len)) {
1862 if (!len)
1863 return true;
1864 setIdleTimer();
1865 // Adjust element's buffer. Remove it from list on completion
1866 unsigned int rest = 0;
1867 if (noComp) {
1868 eout->dataSent(len);
1869 rest = eout->dataCount();
1870 }
1871 else {
1872 m_outXmlCompress.cut(-(int)len);
1873 rest = m_outXmlCompress.length();
1874 }
1875 if (!rest) {
1876 DDebug(this,DebugAll,"Sent element (%p,%s) [%p]",xml,xml->tag(),this);
1877 m_pending.remove(eout,true);
1878 }
1879 else
1880 DDebug(this,DebugAll,"Partially sent element (%p,%s) sent=%u rest=%u [%p]",
1881 xml,xml->tag(),len,rest,this);
1882 return true;
1883 }
1884 // Error
1885 Debug(this,DebugNote,"Failed to send (%p,%s) [%p]",xml,xml->tag(),this);
1886 return false;
1887 }
1888
1889 // Write data to socket
writeSocket(const void * data,unsigned int & len)1890 bool JBStream::writeSocket(const void* data, unsigned int& len)
1891 {
1892 if (!(data && len)) {
1893 len = 0;
1894 return true;
1895 }
1896 Lock lock(m_socketMutex);
1897 if (!socketCanWrite()) {
1898 len = 0;
1899 if (0 != (m_socketFlags & SocketCanWrite)) {
1900 socketSetCanWrite(false);
1901 postponeTerminate(0,m_incoming,XMPPError::SocketError,"No socket");
1902 }
1903 return false;
1904 }
1905 socketSetWriting(true);
1906 lock.drop();
1907 #ifdef JBSTREAM_DEBUG_SOCKET
1908 if (!flag(StreamCompressed))
1909 Debug(this,DebugInfo,"Sending %s [%p]",(const char*)data,this);
1910 else
1911 Debug(this,DebugInfo,"Sending %u compressed bytes [%p]",len,this);
1912 #endif
1913 int w = m_socket->writeData(data,len);
1914 if (w != Socket::socketError())
1915 len = w;
1916 else
1917 len = 0;
1918 #ifdef JBSTREAM_DEBUG_SOCKET
1919 if (!flag(StreamCompressed)) {
1920 String sent((const char*)data,len);
1921 Debug(this,DebugInfo,"Sent %s [%p]",sent.c_str(),this);
1922 }
1923 else
1924 Debug(this,DebugInfo,"Sent %u compressed bytes [%p]",len,this);
1925 #endif
1926 Lock lck(m_socketMutex);
1927 // Check if the connection is waiting to be reset
1928 if (socketWaitReset()) {
1929 socketSetWriting(false);
1930 return true;
1931 }
1932 // Check if something changed
1933 if (!(m_socket && socketWriting())) {
1934 Debug(this,DebugAll,"Socket deleted while writing [%p]",this);
1935 return true;
1936 }
1937 socketSetWriting(false);
1938 if (w != Socket::socketError() || m_socket->canRetry())
1939 return true;
1940 socketSetCanWrite(false);
1941 String tmp;
1942 Thread::errorString(tmp,m_socket->error());
1943 String reason;
1944 reason << "Socket send error: " << tmp << " (" << m_socket->error() << ")";
1945 Debug(this,DebugWarn,"%s [%p]",reason.c_str(),this);
1946 lck.drop();
1947 postponeTerminate(0,m_incoming,XMPPError::SocketError,reason);
1948 return false;
1949 }
1950
1951 // Update stream flags and remote connection data from engine
updateFromRemoteDef()1952 void JBStream::updateFromRemoteDef()
1953 {
1954 m_engine->lock();
1955 JBRemoteDomainDef* domain = m_engine->remoteDomainDef(m_remote.domain());
1956 // Update flags
1957 setFlags(domain->m_flags & StreamFlags);
1958 // Update connection data
1959 if (outgoing() && state() == Idle) {
1960 m_connectAddr = domain->m_address;
1961 m_connectPort = domain->m_port;
1962 }
1963 m_engine->unlock();
1964 }
1965
1966 // Retrieve the first required feature in the list
firstRequiredFeature()1967 XMPPFeature* JBStream::firstRequiredFeature()
1968 {
1969 for (ObjList* o = m_features.skipNull(); o; o = o->skipNext()) {
1970 XMPPFeature* f = static_cast<XMPPFeature*>(o->get());
1971 if (f->required())
1972 return f;
1973 }
1974 return 0;
1975 }
1976
1977 // Drop (delete) received XML element
dropXml(XmlElement * & xml,const char * reason)1978 bool JBStream::dropXml(XmlElement*& xml, const char* reason)
1979 {
1980 if (!xml)
1981 return true;
1982 Debug(this,DebugNote,"Dropping xml=(%p,%s) ns=%s in state=%s reason='%s' [%p]",
1983 xml,xml->tag(),TelEngine::c_safe(xml->xmlns()),stateName(),reason,this);
1984 TelEngine::destruct(xml);
1985 return true;
1986 }
1987
1988 // Set stream flag mask
setFlags(int mask)1989 void JBStream::setFlags(int mask)
1990 {
1991 #ifdef XDEBUG
1992 String f;
1993 XMPPUtils::buildFlags(f,mask,s_flagName);
1994 Debug(this,DebugAll,"Setting flags 0x%X (%s) current=0x%X [%p]",
1995 mask,f.c_str(),m_flags,this);
1996 #endif
1997 m_flags |= mask;
1998 #ifdef DEBUG
1999 if (0 != (mask & StreamCompressed))
2000 Debug(this,DebugAll,"Stream is using compression [%p]",this);
2001 #endif
2002 }
2003
2004 // Reset stream flag mask
resetFlags(int mask)2005 void JBStream::resetFlags(int mask)
2006 {
2007 #ifdef XDEBUG
2008 String f;
2009 XMPPUtils::buildFlags(f,mask,s_flagName);
2010 Debug(this,DebugAll,"Resetting flags 0x%X (%s) current=0x%X [%p]",
2011 mask,f.c_str(),m_flags,this);
2012 #endif
2013 m_flags &= ~mask;
2014 }
2015
2016 // Set the idle timer in Running state
setIdleTimer(u_int64_t msecNow)2017 void JBStream::setIdleTimer(u_int64_t msecNow)
2018 {
2019 // Set only for non c2s in Running state
2020 if (m_type == c2s || m_type == cluster || m_state != Running ||
2021 !m_engine->m_idleTimeout)
2022 return;
2023 m_idleTimeout = msecNow + m_engine->m_idleTimeout;
2024 XDebug(this,DebugAll,"Idle timeout set to " FMT64 "ms [%p]",m_idleTimeout,this);
2025 }
2026
2027 // Reset ping data
resetPing()2028 void JBStream::resetPing()
2029 {
2030 if (!(m_pingTimeout || m_nextPing))
2031 return;
2032 XDebug(this,DebugAll,"Reset ping data [%p]",this);
2033 m_nextPing = 0;
2034 m_pingTimeout = 0;
2035 }
2036
2037 // Set the time of the next ping if there is any timeout and we don't have a ping in progress
2038 // @return XmlElement containing the ping to send, 0 if no ping is going to be sent or 'force' is true
setNextPing(bool force)2039 XmlElement* JBStream::setNextPing(bool force)
2040 {
2041 if (!m_pingInterval) {
2042 resetPing();
2043 return 0;
2044 }
2045 if (m_type != c2s && m_type != comp)
2046 return 0;
2047 if (force) {
2048 m_nextPing = Time::msecNow() + m_pingInterval;
2049 m_pingTimeout = 0;
2050 XDebug(this,DebugAll,"Next ping " FMT64U " [%p]",m_nextPing,this);
2051 return 0;
2052 }
2053 XmlElement* ping = 0;
2054 if (m_nextPing) {
2055 // Ping still active in engine ?
2056 Time time;
2057 if (m_nextPing > time.msec())
2058 return 0;
2059 if (m_engine->m_pingTimeout) {
2060 generateIdIndex(m_pingId,"_ping_");
2061 ping = buildPing(m_pingId);
2062 if (ping)
2063 m_pingTimeout = time.msec() + m_engine->m_pingTimeout;
2064 else
2065 m_pingTimeout = 0;
2066 }
2067 else
2068 resetPing();
2069 }
2070 if (m_pingInterval)
2071 m_nextPing = Time::msecNow() + m_pingInterval;
2072 else
2073 m_nextPing = 0;
2074 XDebug(this,DebugAll,"Next ping " FMT64U " ping=%p [%p]",m_nextPing,ping,this);
2075 return ping;
2076 }
2077
2078 // Process incoming elements in Challenge state
2079 // Return false if stream termination was initiated
processChallenge(XmlElement * xml,const JabberID & from,const JabberID & to)2080 bool JBStream::processChallenge(XmlElement* xml, const JabberID& from, const JabberID& to)
2081 {
2082 int t, n;
2083 if (!XMPPUtils::getTag(*xml,t,n))
2084 return dropXml(xml,"failed to retrieve element tag");
2085 if (n != XMPPNamespace::Sasl)
2086 return dropXml(xml,"expecting sasl namespace");
2087 if (t == XmlTag::Abort) {
2088 TelEngine::destruct(xml);
2089 TelEngine::destruct(m_sasl);
2090 XmlElement* rsp = XMPPUtils::createFailure(XMPPNamespace::Sasl,XMPPError::Aborted);
2091 sendStreamXml(Features,rsp);
2092 return true;
2093 }
2094 if (t != XmlTag::Response) {
2095 dropXml(xml,"expecting sasl response");
2096 return true;
2097 }
2098 XMPPError::Type error = XMPPError::NoError;
2099 // Use a while() to set error and break to the end
2100 while (true) {
2101 // Decode non empty auth data
2102 const String& text = xml->getText();
2103 if (text) {
2104 String tmp;
2105 if (!decodeBase64(tmp,text,this)) {
2106 error = XMPPError::IncorrectEnc;
2107 break;
2108 }
2109 if (m_sasl && !m_sasl->parseMD5ChallengeRsp(tmp)) {
2110 error = XMPPError::MalformedRequest;
2111 break;
2112 }
2113 }
2114 else if (m_sasl)
2115 TelEngine::destruct(m_sasl->m_params);
2116 break;
2117 }
2118 if (error == XMPPError::NoError) {
2119 changeState(Auth);
2120 m_events.append(new JBEvent(JBEvent::Auth,this,xml,from,to));
2121 }
2122 else {
2123 Debug(this,DebugNote,"Received bad challenge response error='%s' [%p]",
2124 XMPPUtils::s_error[error].c_str(),this);
2125 XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
2126 sendStreamXml(Features,failure);
2127 TelEngine::destruct(xml);
2128 }
2129 return true;
2130 }
2131
2132 // Process incoming 'auth' elements qualified by SASL namespace
2133 // Return false if stream termination was initiated
processSaslAuth(XmlElement * xml,const JabberID & from,const JabberID & to)2134 bool JBStream::processSaslAuth(XmlElement* xml, const JabberID& from, const JabberID& to)
2135 {
2136 if (!xml)
2137 return true;
2138 if (!XMPPUtils::isTag(*xml,XmlTag::Auth,XMPPNamespace::Sasl))
2139 return dropXml(xml,"expecting 'auth' in sasl namespace");
2140 XMPPFeatureSasl* sasl = m_features.getSasl();
2141 TelEngine::destruct(m_sasl);
2142 XMPPError::Type error = XMPPError::NoError;
2143 const char* mName = xml->attribute("mechanism");
2144 int mech = XMPPUtils::authMeth(mName);
2145 // Use a while() to set error and break to the end
2146 while (true) {
2147 if (!sasl->mechanism(mech)) {
2148 error = XMPPError::InvalidMechanism;
2149 break;
2150 }
2151 if (mech == XMPPUtils::AuthMD5) {
2152 // Ignore auth text: we will challenge the client
2153 m_sasl = new SASL(false,m_local.domain());
2154 String buf;
2155 if (m_sasl->buildMD5Challenge(buf)) {
2156 XDebug(this,DebugAll,"Sending challenge=%s [%p]",buf.c_str(),this);
2157 Base64 b((void*)buf.c_str(),buf.length());
2158 b.encode(buf);
2159 XmlElement* chg = XMPPUtils::createElement(XmlTag::Challenge,
2160 XMPPNamespace::Sasl,buf);
2161 if (!sendStreamXml(Challenge,chg)) {
2162 TelEngine::destruct(xml);
2163 return false;
2164 }
2165 }
2166 else {
2167 TelEngine::destruct(m_sasl);
2168 error = XMPPError::TempAuthFailure;
2169 break;
2170 }
2171 }
2172 else if (mech == XMPPUtils::AuthPlain) {
2173 // Decode non empty auth data
2174 DataBlock d;
2175 const String& text = xml->getText();
2176 if (text && text != "=" && !decodeBase64(d,text)) {
2177 error = XMPPError::IncorrectEnc;
2178 break;
2179 }
2180 m_sasl = new SASL(true);
2181 if (!m_sasl->parsePlain(d)) {
2182 error = XMPPError::MalformedRequest;
2183 break;
2184 }
2185 }
2186 else {
2187 // This should never happen: we don't handle a mechanism sent
2188 // to the remote party!
2189 Debug(this,DebugWarn,"Unhandled advertised auth mechanism='%s' [%p]",
2190 mName,this);
2191 error = XMPPError::TempAuthFailure;
2192 break;
2193 }
2194 break;
2195 }
2196 if (error == XMPPError::NoError) {
2197 // Challenge state: we've challenged the remote party
2198 // Otherwise: request auth from upper layer
2199 if (state() == Challenge)
2200 TelEngine::destruct(xml);
2201 else {
2202 changeState(Auth);
2203 m_events.append(new JBEvent(JBEvent::Auth,this,xml,from,to));
2204 }
2205 }
2206 else {
2207 Debug(this,DebugNote,"Received auth request mechanism='%s' error='%s' [%p]",
2208 mName,XMPPUtils::s_error[error].c_str(),this);
2209 XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,error);
2210 sendStreamXml(m_state,failure);
2211 TelEngine::destruct(xml);
2212 }
2213 return true;
2214 }
2215
2216 // Process received elements in Features state (incoming stream)
2217 // Return false if stream termination was initiated
processFeaturesIn(XmlElement * xml,const JabberID & from,const JabberID & to)2218 bool JBStream::processFeaturesIn(XmlElement* xml, const JabberID& from, const JabberID& to)
2219 {
2220 if (!xml)
2221 return true;
2222 const String* t = 0;
2223 const String* nsName = 0;
2224 if (!xml->getTag(t,nsName))
2225 return dropXml(xml,"invalid tag namespace prefix");
2226 int ns = nsName ? XMPPUtils::s_ns[*nsName] : XMPPNamespace::Count;
2227
2228 // Component: Waiting for handshake in the stream namespace
2229 if (type() == comp) {
2230 if (outgoing())
2231 return dropXml(xml,"invalid state for incoming stream");
2232 if (*t != XMPPUtils::s_tag[XmlTag::Handshake] || ns != m_xmlns)
2233 return dropXml(xml,"expecting handshake in stream's namespace");
2234 JBEvent* ev = new JBEvent(JBEvent::Auth,this,xml,from,to);
2235 ev->m_text = xml->getText();
2236 m_events.append(ev);
2237 changeState(Auth);
2238 return true;
2239 }
2240
2241 XMPPFeature* f = 0;
2242 // Stream compression feature and compression namespace are not the same!
2243 if (ns != XMPPNamespace::Compress)
2244 f = m_features.get(ns);
2245 else
2246 f = m_features.get(XMPPNamespace::CompressFeature);
2247
2248 // Check if received unexpected feature
2249 if (!f) {
2250 // Check for some features that can be negotiated via 'iq' elements
2251 if (m_type == c2s && *t == XMPPUtils::s_tag[XmlTag::Iq] && ns == m_xmlns) {
2252 int chTag = XmlTag::Count;
2253 int chNs = XMPPNamespace::Count;
2254 XmlElement* child = xml->findFirstChild();
2255 if (child)
2256 XMPPUtils::getTag(*child,chTag,chNs);
2257 // Bind
2258 if (chNs == XMPPNamespace::Bind && m_features.get(XMPPNamespace::Bind)) {
2259 // We've sent bind feature
2260 // Don't accept it if not authenticated and TLS/SASL must be negotiated
2261 if (!flag(StreamAuthenticated)) {
2262 XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2263 if (tls && tls->required()) {
2264 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeWait,
2265 XMPPError::EncryptionRequired);
2266 sendStreamXml(m_state,e);
2267 return true;
2268 }
2269 XMPPFeature* sasl = m_features.get(XMPPNamespace::Sasl);
2270 XMPPFeature* iqAuth = m_features.get(XMPPNamespace::IqAuth);
2271 if ((sasl && sasl->required()) || (iqAuth && iqAuth->required())) {
2272 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeAuth,
2273 XMPPError::NotAllowed);
2274 sendStreamXml(m_state,e);
2275 return true;
2276 }
2277 }
2278 // Remove TLS/SASL features from list: they can't be negotiated anymore
2279 setFlags(StreamSecured | StreamAuthenticated);
2280 m_features.remove(XMPPNamespace::Tls);
2281 m_features.remove(XMPPNamespace::Sasl);
2282 m_features.remove(XMPPNamespace::IqAuth);
2283 changeState(Running);
2284 return processRunning(xml,from,to);
2285 }
2286 else if (chNs == XMPPNamespace::IqRegister) {
2287 // Register
2288 m_events.append(new JBEvent(JBEvent::Iq,this,xml,xml->findFirstChild()));
2289 return true;
2290 }
2291 else if (chNs == XMPPNamespace::IqAuth) {
2292 XMPPUtils::IqType type = XMPPUtils::iqType(xml->attribute("type"));
2293 bool req = type == XMPPUtils::IqGet || type == XMPPUtils::IqSet;
2294 // Stream non SASL auth
2295 // Check if we support it
2296 if (!m_features.get(XMPPNamespace::IqAuth)) {
2297 if (req) {
2298 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeCancel,
2299 XMPPError::NotAllowed);
2300 return sendStreamXml(m_state,e);
2301 }
2302 return dropXml(xml,"unexpected jabber:iq:auth element");
2303 }
2304 if (flag(StreamRemoteVer1)) {
2305 XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2306 if (tls && tls->required()) {
2307 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeWait,
2308 XMPPError::EncryptionRequired);
2309 sendStreamXml(m_state,e);
2310 return true;
2311 }
2312 }
2313 if (chTag != XmlTag::Query) {
2314 if (req) {
2315 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeModify,
2316 XMPPError::FeatureNotImpl);
2317 sendStreamXml(m_state,e);
2318 return true;
2319 }
2320 return dropXml(xml,"expecting iq auth with 'query' child");
2321 }
2322 // Send it to the uppper layer
2323 if (type == XMPPUtils::IqSet) {
2324 m_events.append(new JBEvent(JBEvent::Auth,this,xml,xml->findFirstChild()));
2325 changeState(Auth);
2326 }
2327 else
2328 m_events.append(new JBEvent(JBEvent::Iq,this,xml,xml->findFirstChild()));
2329 return true;
2330 }
2331 }
2332 // s2s waiting for dialback
2333 if (m_type == s2s) {
2334 if (isDbResult(*xml))
2335 return serverStream()->processDbResult(xml,from,to);
2336 // Drop the element if not authenticated
2337 if (!flag(StreamAuthenticated))
2338 return dropXml(xml,"expecting dialback result");
2339 }
2340 // Check if all remaining features are optional
2341 XMPPFeature* req = firstRequiredFeature();
2342 if (req) {
2343 Debug(this,DebugInfo,
2344 "Received '%s' while having '%s' required feature not negotiated [%p]",
2345 xml->tag(),req->c_str(),this);
2346 // TODO: terminate the stream?
2347 return dropXml(xml,"required feature negotiation not completed");
2348 }
2349 // No more required features: change state to Running
2350 // Remove TLS/SASL features from list: they can't be negotiated anymore
2351 setFlags(StreamSecured | StreamAuthenticated);
2352 m_features.remove(XMPPNamespace::Tls);
2353 m_features.remove(XMPPNamespace::Sasl);
2354 changeState(Running);
2355 return processRunning(xml,from,to);
2356 }
2357 // Stream enchryption
2358 if (ns == XMPPNamespace::Tls) {
2359 if (*t != XMPPUtils::s_tag[XmlTag::Starttls])
2360 return dropXml(xml,"expecting tls 'starttls' element");
2361 if (!flag(StreamSecured)) {
2362 // Change state before trying to send the element
2363 // to signal to sendPending() to enchrypt the stream after sending it
2364 changeState(Securing);
2365 sendStreamXml(WaitStart,
2366 XMPPUtils::createElement(XmlTag::Proceed,XMPPNamespace::Tls));
2367 }
2368 else {
2369 Debug(this,DebugNote,"Received '%s' element while already secured [%p]",
2370 xml->tag(),this);
2371 // We shouldn't have Starttls in features list
2372 // Something went wrong: terminate the stream
2373 terminate(0,true,xml,XMPPError::Internal,"Stream already secured");
2374 return false;
2375 }
2376 TelEngine::destruct(xml);
2377 return true;
2378 }
2379 // Stream SASL auth
2380 if (ns == XMPPNamespace::Sasl) {
2381 if (*t != XMPPUtils::s_tag[XmlTag::Auth])
2382 return dropXml(xml,"expecting sasl 'auth' element");
2383 if (!flag(StreamAuthenticated)) {
2384 // Check if we must negotiate TLS before authentication
2385 XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2386 if (tls) {
2387 if (!flag(StreamSecured) && tls->required()) {
2388 TelEngine::destruct(xml);
2389 XmlElement* failure = XMPPUtils::createFailure(XMPPNamespace::Sasl,
2390 XMPPError::EncryptionRequired);
2391 sendStreamXml(m_state,failure);
2392 return true;
2393 }
2394 setSecured();
2395 }
2396 }
2397 else {
2398 // Remote party requested authentication while already done:
2399 // Reset our flag and let it authenticate again
2400 Debug(this,DebugNote,
2401 "Received auth request while already authenticated [%p]",
2402 this);
2403 resetFlags(StreamAuthenticated);
2404 }
2405 return processSaslAuth(xml,from,to);
2406 }
2407 // Stream compression
2408 if (ns == XMPPNamespace::Compress) {
2409 if (*t != XMPPUtils::s_tag[XmlTag::Compress])
2410 return dropXml(xml,"expecting stream compression 'compress' element");
2411 return handleCompressReq(xml);
2412 }
2413 return dropXml(xml,"unhandled stream feature");
2414 }
2415
2416 // Process received elements in Features state (outgoing stream)
2417 // Return false if stream termination was initiated
processFeaturesOut(XmlElement * xml,const JabberID & from,const JabberID & to)2418 bool JBStream::processFeaturesOut(XmlElement* xml, const JabberID& from,
2419 const JabberID& to)
2420 {
2421 if (!xml)
2422 return true;
2423 if (!XMPPUtils::isTag(*xml,XmlTag::Features,XMPPNamespace::Stream))
2424 return dropXml(xml,"expecting stream features");
2425 m_features.fromStreamFeatures(*xml);
2426 // Check TLS
2427 if (!flag(StreamSecured)) {
2428 XMPPFeature* tls = m_features.get(XMPPNamespace::Tls);
2429 if (tls) {
2430 if (m_engine->hasClientTls()) {
2431 TelEngine::destruct(xml);
2432 XmlElement* x = XMPPUtils::createElement(XmlTag::Starttls,
2433 XMPPNamespace::Tls);
2434 return sendStreamXml(WaitTlsRsp,x);
2435 }
2436 if (tls->required() || flag(TlsRequired))
2437 return destroyDropXml(xml,XMPPError::Internal,
2438 "required encryption not available");
2439 }
2440 else if (flag(TlsRequired))
2441 return destroyDropXml(xml,XMPPError::EncryptionRequired,
2442 "required encryption not supported by remote");
2443 setFlags(StreamSecured);
2444 }
2445 // Check auth
2446 if (!flag(StreamAuthenticated)) {
2447 JBServerStream* server = serverStream();
2448 if (server) {
2449 TelEngine::destruct(xml);
2450 return server->sendDialback();
2451 }
2452 JBClientStream* client = clientStream();
2453 if (client) {
2454 // Start auth or request registration data
2455 TelEngine::destruct(xml);
2456 if (!flag(RegisterUser))
2457 return client->startAuth();
2458 return client->requestRegister(false);
2459 }
2460 }
2461 // Check compression
2462 XmlElement* x = checkCompress();
2463 if (x) {
2464 TelEngine::destruct(xml);
2465 return sendStreamXml(Compressing,x);
2466 }
2467 JBClientStream* client = clientStream();
2468 if (client) {
2469 TelEngine::destruct(xml);
2470 return client->bind();
2471 }
2472 JBServerStream* server = serverStream();
2473 JBClusterStream* cluster = clusterStream();
2474 if (server || cluster) {
2475 TelEngine::destruct(xml);
2476 changeState(Running);
2477 return true;
2478 }
2479 return dropXml(xml,"incomplete features process for outgoing stream");
2480 }
2481
2482 // Process received elements in WaitTlsRsp state (outgoing stream)
2483 // The element will be consumed
2484 // Return false if stream termination was initiated
processWaitTlsRsp(XmlElement * xml,const JabberID & from,const JabberID & to)2485 bool JBStream::processWaitTlsRsp(XmlElement* xml, const JabberID& from,
2486 const JabberID& to)
2487 {
2488 if (!xml)
2489 return true;
2490 int t,n;
2491 const char* reason = 0;
2492 if (XMPPUtils::getTag(*xml,t,n)) {
2493 if (n == XMPPNamespace::Tls) {
2494 // Accept proceed and failure
2495 if (t != XmlTag::Proceed && t != XmlTag::Failure)
2496 reason = "expecting tls 'proceed' or 'failure'";
2497 }
2498 else
2499 reason = "expecting tls namespace";
2500 }
2501 else
2502 reason = "failed to retrieve element tag";
2503 if (reason) {
2504 // TODO: Unacceptable response to starttls request
2505 // Restart socket read or terminate the stream ?
2506 socketSetCanRead(true);
2507 return dropXml(xml,reason);
2508 }
2509 if (t == XmlTag::Proceed) {
2510 TelEngine::destruct(xml);
2511 changeState(Securing);
2512 m_engine->encryptStream(this);
2513 socketSetCanRead(true);
2514 setFlags(StreamTls);
2515 XmlElement* s = buildStreamStart();
2516 return sendStreamXml(WaitStart,s);
2517 }
2518 // TODO: Implement TLS usage reset if the stream is going to re-connect
2519 terminate(1,false,xml,XMPPError::NoError,"Server can't start TLS");
2520 return false;
2521 }
2522
2523 // Set stream namespace from type
setXmlns()2524 void JBStream::setXmlns()
2525 {
2526 switch (m_type) {
2527 case c2s:
2528 m_xmlns = XMPPNamespace::Client;
2529 break;
2530 case s2s:
2531 m_xmlns = XMPPNamespace::Server;
2532 break;
2533 case comp:
2534 m_xmlns = XMPPNamespace::ComponentAccept;
2535 break;
2536 case cluster:
2537 m_xmlns = XMPPNamespace::YateCluster;
2538 break;
2539 }
2540 }
2541
2542 // Event termination notification
eventTerminated(const JBEvent * ev)2543 void JBStream::eventTerminated(const JBEvent* ev)
2544 {
2545 if (ev && ev == m_lastEvent) {
2546 m_lastEvent = 0;
2547 XDebug(this,DebugAll,"Event (%p,%s) terminated [%p]",ev,ev->name(),this);
2548 }
2549 }
2550
2551 // Compress data to be sent (the pending stream xml buffer or pending stanza)
2552 // Return false on failure
compress(XmlElementOut * xml)2553 bool JBStream::compress(XmlElementOut* xml)
2554 {
2555 DataBlock& buf = xml ? m_outXmlCompress : m_outStreamXmlCompress;
2556 const String& xmlBuf = xml ? xml->buffer() : m_outStreamXml;
2557 m_socketMutex.lock();
2558 int res = m_compress ? m_compress->compress(xmlBuf.c_str(),xmlBuf.length(),buf) : -1000;
2559 m_socketMutex.unlock();
2560 const char* s = xml ? "pending" : "stream";
2561 if (res >= 0) {
2562 if ((unsigned int)res == xmlBuf.length()) {
2563 #ifdef JBSTREAM_DEBUG_COMPRESS
2564 Debug(this,DebugInfo,"Compressed %s xml %u --> %u [%p]",
2565 s,xmlBuf.length(),buf.length(),this);
2566 #endif
2567 return true;
2568 }
2569 Debug(this,DebugNote,"Partially compressed %s xml %d/%u [%p]",
2570 s,res,xmlBuf.length(),this);
2571 }
2572 else
2573 Debug(this,DebugNote,"Failed to compress %s xml: %d [%p]",s,res,this);
2574 return false;
2575 }
2576
2577 // Reset connect status data
resetConnectStatus()2578 void JBStream::resetConnectStatus()
2579 {
2580 DDebug(this,DebugAll,"resetConnectStatus() [%p]",this);
2581 m_connectStatus = JBConnect::Start;
2582 m_connectSrvs.clear();
2583 }
2584
2585 // Postpone stream terminate until all parsed elements are processed
2586 // Terminate now if allowed
postponeTerminate(int location,bool destroy,int error,const char * reason)2587 void JBStream::postponeTerminate(int location, bool destroy, int error, const char* reason)
2588 {
2589 lock();
2590 XDebug(this,DebugAll,"postponeTerminate(%d,%u,%s,%s) state=%s [%p]",
2591 location,destroy,XMPPUtils::s_error[error].c_str(),reason,stateName(),this);
2592 if (!m_ppTerminate) {
2593 int interval = 0;
2594 if (type() == c2s)
2595 interval = m_engine->m_pptTimeoutC2s;
2596 else
2597 interval = m_engine->m_pptTimeout;
2598 if (interval && haveData()) {
2599 m_ppTerminate = new NamedList("");
2600 m_ppTerminate->addParam("location",String(location));
2601 m_ppTerminate->addParam("destroy",String::boolText(destroy));
2602 m_ppTerminate->addParam("error",String(error));
2603 m_ppTerminate->addParam("reason",reason);
2604 m_ppTerminateTimeout = Time::msecNow() + interval;
2605 Debug(this,DebugInfo,
2606 "Postponed termination location=%d destroy=%u error=%s reason=%s interval=%us [%p]",
2607 location,destroy,XMPPUtils::s_error[error].c_str(),reason,interval,this);
2608 }
2609 }
2610 bool postponed = m_ppTerminate != 0;
2611 unlock();
2612 if (!postponed)
2613 terminate(location,destroy,0,error,reason);
2614 }
2615
2616 // Handle postponed termination. Return true if found
postponedTerminate()2617 bool JBStream::postponedTerminate()
2618 {
2619 if (!m_ppTerminate)
2620 return false;
2621 int location = m_ppTerminate->getIntValue("location");
2622 bool destroy = m_ppTerminate->getBoolValue("destroy");
2623 int error = m_ppTerminate->getIntValue("error");
2624 String reason = m_ppTerminate->getValue("reason");
2625 resetPostponedTerminate();
2626 DDebug(this,DebugAll,"postponedTerminate(%d,%u,%s,%s) state=%s [%p]",
2627 location,destroy,XMPPUtils::s_error[error].c_str(),reason.c_str(),
2628 stateName(),this);
2629 terminate(location,destroy,0,error,reason);
2630 return true;
2631 }
2632
2633 // Reset redirect data
setRedirect(const String & addr,int port)2634 void JBStream::setRedirect(const String& addr, int port)
2635 {
2636 if (!addr) {
2637 if (m_redirectAddr)
2638 Debug(this,DebugInfo,"Cleared redirect data [%p]",this);
2639 m_redirectAddr = "";
2640 m_redirectPort = 0;
2641 return;
2642 }
2643 if (m_redirectCount >= m_redirectMax) {
2644 setRedirect();
2645 return;
2646 }
2647 resetConnectStatus();
2648 m_redirectAddr = addr;
2649 m_redirectPort = port;
2650 m_redirectCount++;
2651 Debug(this,DebugInfo,"Set redirect to '%s:%d' in state %s (counter=%u max=%u) [%p]",
2652 m_redirectAddr.c_str(),m_redirectPort,stateName(),m_redirectCount,m_redirectMax,this);
2653 }
2654
2655
2656 /*
2657 * JBClientStream
2658 */
JBClientStream(JBEngine * engine,Socket * socket,bool ssl)2659 JBClientStream::JBClientStream(JBEngine* engine, Socket* socket, bool ssl)
2660 : JBStream(engine,socket,c2s,ssl),
2661 m_userData(0), m_registerReq(0)
2662 {
2663 }
2664
JBClientStream(JBEngine * engine,const JabberID & jid,const String & account,const NamedList & params,const char * name,const char * serverHost)2665 JBClientStream::JBClientStream(JBEngine* engine, const JabberID& jid, const String& account,
2666 const NamedList& params, const char* name, const char* serverHost)
2667 : JBStream(engine,c2s,jid,jid.domain(),TelEngine::null(name) ? account.c_str() : name,
2668 ¶ms,serverHost),
2669 m_account(account), m_userData(0), m_registerReq(0)
2670 {
2671 m_password = params.getValue("password");
2672 }
2673
2674 // Build a ping iq stanza
buildPing(const String & stanzaId)2675 XmlElement* JBClientStream::buildPing(const String& stanzaId)
2676 {
2677 return XMPPUtils::createPing(stanzaId);
2678 }
2679
2680 // Bind a resource to an incoming stream
bind(const String & resource,const char * id,XMPPError::Type error)2681 void JBClientStream::bind(const String& resource, const char* id, XMPPError::Type error)
2682 {
2683 DDebug(this,DebugAll,"bind(%s,'%s') [%p]",resource.c_str(),
2684 XMPPUtils::s_error[error].c_str(),this);
2685 Lock lock(this);
2686 if (!incoming() || m_remote.resource())
2687 return;
2688 XmlElement* xml = 0;
2689 if (resource) {
2690 m_remote.resource(resource);
2691 xml = XMPPUtils::createIq(XMPPUtils::IqResult,0,0,id);
2692 XmlElement* bind = XMPPUtils::createElement(XmlTag::Bind,
2693 XMPPNamespace::Bind);
2694 bind->addChild(XMPPUtils::createElement(XmlTag::Jid,m_remote));
2695 xml->addChild(bind);
2696 }
2697 else {
2698 if (error == XMPPError::NoError)
2699 error = XMPPError::NotAllowed;
2700 xml = XMPPUtils::createError(XMPPError::TypeModify,error);
2701 }
2702 // Remove non-negotiable bind feature on success
2703 if (sendStanza(xml) && resource)
2704 m_features.remove(XMPPNamespace::Bind);
2705 }
2706
2707 // Request account setup (or info) on outgoing stream
requestRegister(bool data,bool set,const String & newPass)2708 bool JBClientStream::requestRegister(bool data, bool set, const String& newPass)
2709 {
2710 if (incoming())
2711 return true;
2712
2713 Lock lock(this);
2714 DDebug(this,DebugAll,"requestRegister(%u,%u) [%p]",data,set,this);
2715 XmlElement* req = 0;
2716 if (data) {
2717 // Register new user, change the account or remove it
2718 if (set) {
2719 // TODO: Allow user account register/change through unsecured streams ?
2720 String* pass = 0;
2721 if (!flag(StreamAuthenticated))
2722 pass = &m_password;
2723 else if (newPass) {
2724 m_newPassword = newPass;
2725 pass = &m_newPassword;
2726 }
2727 if (!pass)
2728 return false;
2729 m_registerReq = '2';
2730 req = XMPPUtils::createRegisterQuery(0,0,String(m_registerReq),
2731 m_local.node(),*pass);
2732 }
2733 else if (flag(StreamAuthenticated)) {
2734 m_registerReq = '3';
2735 req = XMPPUtils::createRegisterQuery(XMPPUtils::IqSet,0,0,
2736 String(m_registerReq),XMPPUtils::createElement(XmlTag::Remove));
2737 }
2738 else
2739 return false;
2740 }
2741 else {
2742 // Request register info
2743 m_registerReq = '1';
2744 req = XMPPUtils::createRegisterQuery(XMPPUtils::IqGet,0,0,String(m_registerReq));
2745 }
2746 if (!flag(StreamAuthenticated) || state() != Running)
2747 return sendStreamXml(Register,req);
2748 return sendStanza(req);
2749 }
2750
2751 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)2752 bool JBClientStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
2753 {
2754 if (!xml)
2755 return true;
2756 // Check if a resource was bound to an incoming stream
2757 // Accept only 'iq' with bind namespace only if we've sent 'bind' feature
2758 if (incoming()) {
2759 if (!m_remote.resource()) {
2760 if (XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns)) {
2761 XmlElement* child = XMPPUtils::findFirstChild(*xml,XmlTag::Bind,XMPPNamespace::Bind);
2762 if (child && m_features.get(XMPPNamespace::Bind)) {
2763 m_events.append(new JBEvent(JBEvent::Bind,this,xml,from,to,child));
2764 return true;
2765 }
2766 }
2767 XmlElement* e = XMPPUtils::createError(xml,XMPPError::TypeCancel,
2768 XMPPError::NotAllowed,"No resource bound to the stream");
2769 sendStanza(e);
2770 return true;
2771 }
2772 }
2773 else if (m_registerReq && XMPPUtils::isTag(*xml,XmlTag::Iq,m_xmlns) &&
2774 isRegisterId(*xml) && XMPPUtils::isResponse(*xml))
2775 return processRegister(xml,from,to);
2776 return JBStream::processRunning(xml,from,to);
2777 }
2778
2779 // Process received elements in WaitStart state
2780 // WaitStart: Incoming: waiting for stream start
2781 // Outgoing: idem (our stream start was already sent)
2782 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)2783 bool JBClientStream::processStart(const XmlElement* xml, const JabberID& from,
2784 const JabberID& to)
2785 {
2786 XDebug(this,DebugAll,"JBClientStream::processStart(%s) [%p]",xml->tag(),this);
2787
2788 // Check element
2789 if (!processStreamStart(xml))
2790 return false;
2791
2792 // RFC3920 5.3.1:
2793 // The 'from' attribute must be set for response stream start
2794 if (outgoing()) {
2795 if (from.null()) {
2796 Debug(this,DebugNote,"Received '%s' with empty 'from' [%p]",xml->tag(),this);
2797 terminate(0,false,0,XMPPError::BadAddressing,"Missing 'from' attribute");
2798 return false;
2799 }
2800 }
2801 else {
2802 if (!flag(StreamAuthenticated)) {
2803 m_remote.set(from);
2804 m_local.set(to);
2805 }
2806 }
2807 m_remote.resource("");
2808 // RFC3920 5.3.1: The 'to' attribute must always be set
2809 // RFC3920: The 'to' attribute is optional
2810 bool validTo = !to.null();
2811 if (validTo) {
2812 if (outgoing())
2813 validTo = (m_local.bare() == to);
2814 else
2815 validTo = engine()->hasDomain(to.domain());
2816 }
2817 #ifdef RFC3920
2818 else
2819 validTo = outgoing();
2820 #endif
2821 if (!validTo) {
2822 Debug(this,DebugNote,"Received '%s' with invalid to='%s' [%p]",
2823 xml->tag(),to.c_str(),this);
2824 terminate(0,false,0,
2825 to.null() ? XMPPError::BadAddressing : XMPPError::HostUnknown,
2826 "Invalid 'to' attribute");
2827 return false;
2828 }
2829 if (incoming() || flag(StreamRemoteVer1)) {
2830 m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
2831 return true;
2832 }
2833 Debug(this,DebugNote,"Outgoing client stream: unsupported remote version (expecting 1.x)");
2834 terminate(0,true,0,XMPPError::Internal,"Unsupported version");
2835 return false;
2836 }
2837
2838 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)2839 bool JBClientStream::processAuth(XmlElement* xml, const JabberID& from,
2840 const JabberID& to)
2841 {
2842 if (!xml)
2843 return true;
2844 if (incoming())
2845 return destroyDropXml(xml,XMPPError::Internal,"invalid state for incoming stream");
2846 int t,n;
2847 if (!XMPPUtils::getTag(*xml,t,n))
2848 return destroyDropXml(xml,XMPPError::Internal,"failed to retrieve element tag");
2849
2850 // Authenticating
2851 if (!flag(StreamAuthenticated)) {
2852 // TODO: The server might challenge us again
2853 // Implement support for multiple challenge/response steps
2854 if (n != XMPPNamespace::Sasl)
2855 return destroyDropXml(xml,XMPPError::InvalidNamespace,
2856 "element with non SASL namespace");
2857 if (!m_sasl)
2858 return destroyDropXml(xml,XMPPError::Internal,"no SASL data");
2859 if (t == XmlTag::Failure) {
2860 terminate(0,true,xml);
2861 return false;
2862 }
2863 if (!m_sasl->m_plain) {
2864 // Digest MD5
2865 if (flag(StreamWaitChallenge)) {
2866 if (t != XmlTag::Challenge)
2867 return destroyDropXml(xml,XMPPError::BadRequest,"expecting challenge");
2868 String tmp;
2869 if (!decodeBase64(tmp,xml->getText(),this))
2870 return destroyDropXml(xml,XMPPError::IncorrectEnc,
2871 "challenge with incorrect encoding");
2872 if (!m_sasl->parseMD5Challenge(tmp))
2873 return destroyDropXml(xml,XMPPError::MalformedRequest,
2874 "invalid challenge format");
2875 TelEngine::destruct(xml);
2876 m_sasl->setAuthParams(m_local.node(),m_password);
2877 tmp.clear();
2878 m_sasl->buildAuthRsp(tmp,"xmpp/" + m_local.domain());
2879 resetFlags(StreamWaitChallenge);
2880 setFlags(StreamWaitChgRsp);
2881 XmlElement* rsp = XMPPUtils::createElement(XmlTag::Response,XMPPNamespace::Sasl,tmp);
2882 return sendStreamXml(state(),rsp);
2883 }
2884 // Digest MD5 response reply
2885 if (flag(StreamWaitChgRsp)) {
2886 #ifdef RFC3920
2887 // Expect success or challenge
2888 // challenge is accepted if not already received one
2889 if (t != XmlTag::Success && (t != XmlTag::Challenge || flag(StreamRfc3920Chg)))
2890 #else
2891 // Expect success
2892 if (t != XmlTag::Success)
2893 #endif
2894 return dropXml(xml,"unexpected element");
2895 if (!flag(StreamRfc3920Chg)) {
2896 String rspAuth;
2897 if (!decodeBase64(rspAuth,xml->getText(),this))
2898 return destroyDropXml(xml,XMPPError::IncorrectEnc,
2899 "challenge response reply with incorrect encoding");
2900 if (!rspAuth.startSkip("rspauth=",false))
2901 return destroyDropXml(xml,XMPPError::BadFormat,
2902 "invalid challenge response reply");
2903 if (!m_sasl->validAuthReply(rspAuth))
2904 return destroyDropXml(xml,XMPPError::InvalidAuth,
2905 "incorrect challenge response reply auth");
2906 }
2907 #ifdef RFC3920
2908 // Send empty response to challenge
2909 if (t == XmlTag::Challenge) {
2910 setFlags(StreamRfc3920Chg);
2911 TelEngine::destruct(xml);
2912 XmlElement* rsp = XMPPUtils::createElement(XmlTag::Response,
2913 XMPPNamespace::Sasl);
2914 return sendStreamXml(state(),rsp);
2915 }
2916 #endif
2917 resetFlags(StreamWaitChgRsp | StreamRfc3920Chg);
2918 }
2919 else
2920 return dropXml(xml,"unhandled sasl digest md5 state");
2921 }
2922 else {
2923 // Plain
2924 if (t != XmlTag::Success)
2925 return dropXml(xml,"unexpected element");
2926 }
2927 // Authenticated. Bind a resource
2928 Debug(this,DebugAll,"Authenticated [%p]",this);
2929 TelEngine::destruct(xml);
2930 TelEngine::destruct(m_sasl);
2931 setFlags(StreamAuthenticated);
2932 XmlElement* start = buildStreamStart();
2933 return sendStreamXml(WaitStart,start);
2934 }
2935
2936 XMPPUtils::IqType iq = XMPPUtils::iqType(xml->attribute("type"));
2937 String* id = xml->getAttribute("id");
2938
2939 // Waiting for bind response
2940 if (flag(StreamWaitBindRsp)) {
2941 // Expecting 'iq' result or error
2942 if (t != XmlTag::Iq ||
2943 (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError) ||
2944 !id || *id != "bind_1")
2945 return dropXml(xml,"unexpected element");
2946 if (iq == XMPPUtils::IqError) {
2947 Debug(this,DebugNote,"Resource binding failed [%p]",this);
2948 terminate(0,true,xml);
2949 return false;
2950 }
2951 // Check it
2952 bool ok = false;
2953 while (true) {
2954 XmlElement* bind = XMPPUtils::findFirstChild(*xml,XmlTag::Bind,XMPPNamespace::Bind);
2955 if (!bind)
2956 break;
2957 XmlElement* tmp = bind->findFirstChild(&XMPPUtils::s_tag[XmlTag::Jid]);
2958 if (!tmp)
2959 break;
2960 JabberID jid(tmp->getText());
2961 if (jid.bare() != m_local.bare())
2962 break;
2963 ok = true;
2964 if (m_local.resource() != jid.resource()) {
2965 m_local.resource(jid.resource());
2966 Debug(this,DebugAll,"Resource set to '%s' [%p]",
2967 local().resource().c_str(),this);
2968 }
2969 break;
2970 }
2971 if (!ok)
2972 return destroyDropXml(xml,XMPPError::UndefinedCondition,
2973 "unacceptable bind response");
2974 resetFlags(StreamWaitBindRsp);
2975 TelEngine::destruct(xml);
2976 if (!m_features.get(XMPPNamespace::Session)) {
2977 changeState(Running);
2978 return true;
2979 }
2980 // Send session
2981 XmlElement* sess = XMPPUtils::createIq(XMPPUtils::IqSet,0,0,"sess_1");
2982 sess->addChild(XMPPUtils::createElement(XmlTag::Session,XMPPNamespace::Session));
2983 setFlags(StreamWaitSessRsp);
2984 return sendStreamXml(state(),sess);
2985 }
2986
2987 // Waiting for session response
2988 if (flag(StreamWaitSessRsp)) {
2989 // Expecting 'iq' result or error
2990 if (t != XmlTag::Iq ||
2991 (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError) ||
2992 !id || *id != "sess_1")
2993 return dropXml(xml,"unexpected element");
2994 if (iq == XMPPUtils::IqError) {
2995 Debug(this,DebugNote,"Session failed [%p]",this);
2996 terminate(0,true,xml);
2997 return false;
2998 }
2999 TelEngine::destruct(xml);
3000 resetFlags(StreamWaitSessRsp);
3001 changeState(Running);
3002 return true;
3003 }
3004
3005 return dropXml(xml,"unhandled");
3006 }
3007
3008 // Process elements in Register state
processRegister(XmlElement * xml,const JabberID & from,const JabberID & to)3009 bool JBClientStream::processRegister(XmlElement* xml, const JabberID& from,
3010 const JabberID& to)
3011 {
3012 if (!xml)
3013 return true;
3014 int t, ns;
3015 if (!XMPPUtils::getTag(*xml,t,ns))
3016 return dropXml(xml,"failed to retrieve element tag");
3017 if (t != XmlTag::Iq)
3018 return dropXml(xml,"expecting 'iq'");
3019 XMPPUtils::IqType iq = XMPPUtils::iqType(xml->attribute("type"));
3020 if (iq != XMPPUtils::IqResult && iq != XMPPUtils::IqError)
3021 return dropXml(xml,"expecting 'iq' response");
3022 if (!isRegisterId(*xml))
3023 return dropXml(xml,"unexpected response id");
3024 if (iq == XMPPUtils::IqError) {
3025 m_events.append(new JBEvent(JBEvent::RegisterFailed,this,xml,from,to));
3026 // Don't terminate if the user requested account change after authentication
3027 if (!flag(StreamAuthenticated))
3028 terminate(0,true,0,XMPPError::NoError);
3029 return flag(StreamAuthenticated);
3030 }
3031 // Requested registration data
3032 if (m_registerReq == '1') {
3033 // XEP-0077: check for username and password children or
3034 // instructions
3035 XmlElement* query = XMPPUtils::findFirstChild(*xml,XmlTag::Query,
3036 XMPPNamespace::IqRegister);
3037 if (query && XMPPUtils::findFirstChild(*query,XmlTag::Username) &&
3038 XMPPUtils::findFirstChild(*query,XmlTag::Password)) {
3039 TelEngine::destruct(xml);
3040 return requestRegister(true);
3041 }
3042 m_events.append(new JBEvent(JBEvent::RegisterFailed,this,xml,from,to));
3043 // Don't terminate if the user requested account change after authentication
3044 if (!flag(StreamAuthenticated))
3045 terminate(0,true,0,XMPPError::NoError);
3046 return flag(StreamAuthenticated);
3047 }
3048 // Requested registration/change
3049 if (m_registerReq == '2') {
3050 m_events.append(new JBEvent(JBEvent::RegisterOk,this,xml,from,to));
3051 // Reset register user flag
3052 resetFlags(RegisterUser);
3053 // Done if account changed after authentication
3054 if (flag(StreamAuthenticated)) {
3055 m_password = m_newPassword;
3056 return true;
3057 }
3058 // Start auth
3059 changeState(Features);
3060 return startAuth();
3061 }
3062 // Requested account removal
3063 if (m_registerReq == '3') {
3064 terminate(0,true,xml,XMPPError::Reg,"Account removed");
3065 return false;
3066 }
3067 return destroyDropXml(xml,XMPPError::Internal,"unhandled state");
3068 }
3069
3070 // Release memory
destroyed()3071 void JBClientStream::destroyed()
3072 {
3073 userData(0);
3074 JBStream::destroyed();
3075 }
3076
3077 // Start outgoing stream authentication
startAuth()3078 bool JBClientStream::startAuth()
3079 {
3080 if (incoming() || state() != Features)
3081 return false;
3082
3083 TelEngine::destruct(m_sasl);
3084
3085 XMPPFeatureSasl* sasl = m_features.getSasl();
3086 if (!sasl) {
3087 terminate(0,true,0,XMPPError::NoError,"Missing authentication data");
3088 return false;
3089 }
3090
3091 // RFC 3920 SASL auth
3092 int mech = XMPPUtils::AuthNone;
3093 if (sasl->mechanism(XMPPUtils::AuthMD5))
3094 mech = XMPPUtils::AuthMD5;
3095 else if (sasl->mechanism(XMPPUtils::AuthPlain) && flag(AllowPlainAuth))
3096 mech = XMPPUtils::AuthPlain;
3097 else {
3098 terminate(0,true,0,XMPPError::NoError,"Unsupported authentication mechanism");
3099 return false;
3100 }
3101
3102 m_sasl = new SASL(mech == XMPPUtils::AuthPlain);
3103 String rsp;
3104 if (m_sasl->m_plain) {
3105 m_sasl->setAuthParams(m_local.node(),m_password);
3106 if (!m_sasl->buildAuthRsp(rsp)) {
3107 terminate(0,true,0,XMPPError::NoError,"Invalid auth data length for plain auth");
3108 return false;
3109 }
3110 }
3111 else
3112 setFlags(StreamWaitChallenge);
3113 // MD5: send auth element, wait challenge
3114 // Plain auth: send auth element with credentials and wait response (success/failure)
3115 XmlElement* xml = XMPPUtils::createElement(XmlTag::Auth,XMPPNamespace::Sasl,rsp);
3116 xml->setAttribute("mechanism",lookup(mech,XMPPUtils::s_authMeth));
3117 return sendStreamXml(Auth,xml);
3118 }
3119
3120 // Start resource binding on outgoing stream
bind()3121 bool JBClientStream::bind()
3122 {
3123 Debug(this,DebugAll,"Binding resource [%p]",this);
3124 XmlElement* bind = XMPPUtils::createElement(XmlTag::Bind,XMPPNamespace::Bind);
3125 if (m_local.resource())
3126 bind->addChild(XMPPUtils::createElement(XmlTag::Resource,m_local.resource()));
3127 XmlElement* b = XMPPUtils::createIq(XMPPUtils::IqSet,0,0,"bind_1");
3128 b->addChild(bind);
3129 setFlags(StreamWaitBindRsp);
3130 return sendStreamXml(Auth,b);
3131 }
3132
3133
3134 /*
3135 * JBServerStream
3136 */
3137 // Build an incoming stream from a socket
JBServerStream(JBEngine * engine,Socket * socket,bool component)3138 JBServerStream::JBServerStream(JBEngine* engine, Socket* socket, bool component)
3139 : JBStream(engine,socket,component ? comp : s2s),
3140 m_remoteDomains(""), m_dbKey(0)
3141 {
3142 }
3143
3144 // Build an outgoing stream
JBServerStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const char * dbId,const char * dbKey,bool dbOnly,const NamedList * params)3145 JBServerStream::JBServerStream(JBEngine* engine, const JabberID& local,
3146 const JabberID& remote, const char* dbId, const char* dbKey, bool dbOnly,
3147 const NamedList* params)
3148 : JBStream(engine,s2s,local,remote,0,params),
3149 m_remoteDomains(""), m_dbKey(0)
3150 {
3151 if (!(TelEngine::null(dbId) || TelEngine::null(dbKey)))
3152 m_dbKey = new NamedString(dbId,dbKey);
3153 if (dbOnly)
3154 setFlags(DialbackOnly | NoAutoRestart);
3155 }
3156
3157 // Constructor. Build an outgoing component stream
JBServerStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const String * name,const NamedList * params)3158 JBServerStream::JBServerStream(JBEngine* engine, const JabberID& local, const JabberID& remote,
3159 const String* name, const NamedList* params)
3160 : JBStream(engine,comp,local,remote,name ? name->c_str() : 0,params),
3161 m_remoteDomains(""), m_dbKey(0)
3162 {
3163 if (params)
3164 m_password = params->getValue("password");
3165 }
3166
3167 // Send a dialback verify response
sendDbVerify(const char * from,const char * to,const char * id,XMPPError::Type rsp)3168 bool JBServerStream::sendDbVerify(const char* from, const char* to, const char* id,
3169 XMPPError::Type rsp)
3170 {
3171 adjustDbRsp(rsp);
3172 XmlElement* result = XMPPUtils::createDialbackVerifyRsp(from,to,id,rsp);
3173 DDebug(this,DebugAll,"Sending '%s' db:verify response from %s to %s [%p]",
3174 result->attribute("type"),from,to,this);
3175 return state() < Running ? sendStreamXml(state(),result) : sendStanza(result);
3176 }
3177
3178 // Send a dialback key response. Update the remote domains list
3179 // Terminate the stream if there are no more remote domains
sendDbResult(const JabberID & from,const JabberID & to,XMPPError::Type rsp)3180 bool JBServerStream::sendDbResult(const JabberID& from, const JabberID& to, XMPPError::Type rsp)
3181 {
3182 Lock lock(this);
3183 // Check local domain
3184 if (m_local != from)
3185 return false;
3186 // Respond only to received requests
3187 NamedString* p = m_remoteDomains.getParam(to);
3188 if (!p)
3189 return false;
3190 bool valid = rsp == XMPPError::NoError;
3191 // Don't deny already authenticated requests
3192 if (p->null() && !valid)
3193 return false;
3194 // Set request state or remove it if not accepted
3195 if (valid)
3196 p->clear();
3197 else
3198 m_remoteDomains.clearParam(to);
3199 bool ok = false;
3200 adjustDbRsp(rsp);
3201 XmlElement* result = XMPPUtils::createDialbackResult(from,to,rsp);
3202 DDebug(this,DebugAll,"Sending '%s' db:result response from %s to %s [%p]",
3203 result->attribute("type"),from.c_str(),to.c_str(),this);
3204 if (m_state < Running) {
3205 ok = sendStreamXml(Running,result);
3206 // Remove features and set the authenticated flag
3207 if (ok && valid) {
3208 m_features.remove(XMPPNamespace::Sasl);
3209 m_features.remove(XMPPNamespace::IqAuth);
3210 setFlags(StreamAuthenticated);
3211 // Compression can still be set
3212 if (!flag(StreamCompressed) && m_features.get(XMPPNamespace::CompressFeature))
3213 setFlags(StreamCanCompress);
3214 else
3215 resetFlags(StreamCanCompress);
3216 }
3217 }
3218 else if (m_state == Running)
3219 ok = sendStanza(result);
3220 else
3221 TelEngine::destruct(result);
3222 // Terminate the stream if there are no more remote domains
3223 if (!m_remoteDomains.count())
3224 terminate(-1,true,0,rsp);
3225 return ok;
3226 }
3227
3228 // Send dialback data (key/verify)
sendDialback()3229 bool JBServerStream::sendDialback()
3230 {
3231 State newState = Running;
3232 XmlElement* result = 0;
3233 if (!flag(DialbackOnly)) {
3234 if (flag(StreamAuthenticated))
3235 newState = Running;
3236 else {
3237 String key;
3238 engine()->buildDialbackKey(id(),m_local,m_remote,key);
3239 result = XMPPUtils::createDialbackKey(m_local,m_remote,key);
3240 newState = Auth;
3241 }
3242 }
3243 else if (!m_dbKey) {
3244 // Dialback only with no key?
3245 Debug(this,DebugNote,"Outgoing dialback stream with no key! [%p]",this);
3246 terminate(0,true,0,XMPPError::Internal);
3247 return false;
3248 }
3249 if (m_dbKey) {
3250 XmlElement* db = XMPPUtils::createDialbackVerify(m_local,m_remote,
3251 m_dbKey->name(),*m_dbKey);
3252 if (result)
3253 return sendStreamXml(newState,result,db);
3254 return sendStreamXml(newState,db);
3255 }
3256 if (result)
3257 return sendStreamXml(newState,result);
3258 changeState(newState);
3259 return true;
3260 }
3261
3262 // Release memory
destroyed()3263 void JBServerStream::destroyed()
3264 {
3265 TelEngine::destruct(m_dbKey);
3266 JBStream::destroyed();
3267 }
3268
3269 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)3270 bool JBServerStream::processRunning(XmlElement* xml, const JabberID& from,
3271 const JabberID& to)
3272 {
3273 if (!xml)
3274 return true;
3275 // Incoming, authenticated stream which might still request compression
3276 // Any other element will reset compression offer
3277 if (flag(StreamCanCompress)) {
3278 if (incoming() && !flag(StreamCompressed) &&
3279 m_features.get(XMPPNamespace::CompressFeature)) {
3280 int t = XmlTag::Count;
3281 int n = XMPPNamespace::Count;
3282 XMPPUtils::getTag(*xml,t,n);
3283 if (t == XmlTag::Compress && n == XMPPNamespace::Compress)
3284 return handleCompressReq(xml);
3285 }
3286 resetFlags(StreamCanCompress);
3287 m_features.remove(XMPPNamespace::CompressFeature);
3288 }
3289 // Check the tags of known dialback elements:
3290 // there are servers who don't stamp them with the namespace
3291 // Let other elements stamped with dialback namespace go the upper layer
3292 if (type() != comp && isDbResult(*xml)) {
3293 if (outgoing())
3294 return dropXml(xml,"dialback result on outgoing stream");
3295 return processDbResult(xml,from,to);
3296 }
3297 // Call default handler
3298 return JBStream::processRunning(xml,from,to);
3299 }
3300
3301 // Build a stream start XML element
buildStreamStart()3302 XmlElement* JBServerStream::buildStreamStart()
3303 {
3304 XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
3305 if (incoming())
3306 start->setAttribute("id",m_id);
3307 XMPPUtils::setStreamXmlns(*start);
3308 start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
3309 if (type() == s2s) {
3310 start->setAttribute(XmlElement::s_nsPrefix + "db",XMPPUtils::s_ns[XMPPNamespace::Dialback]);
3311 if (!dialback()) {
3312 start->setAttributeValid("from",m_local.bare());
3313 start->setAttributeValid("to",m_remote.bare());
3314 if (outgoing() || flag(StreamLocalVer1))
3315 start->setAttribute("version","1.0");
3316 start->setAttribute("xml:lang","en");
3317 }
3318 }
3319 else if (type() == comp) {
3320 if (incoming())
3321 start->setAttributeValid("from",m_remote.domain());
3322 else
3323 start->setAttributeValid("to",m_local.domain());
3324 }
3325 return start;
3326 }
3327
3328 // Process received elements in WaitStart state
3329 // WaitStart: Incoming: waiting for stream start
3330 // Outgoing: idem (our stream start was already sent)
3331 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)3332 bool JBServerStream::processStart(const XmlElement* xml, const JabberID& from,
3333 const JabberID& to)
3334 {
3335 XDebug(this,DebugAll,"JBServerStream::processStart() [%p]",this);
3336
3337 if (!processStreamStart(xml))
3338 return false;
3339
3340 if (type() == comp) {
3341 String from = xml->attribute("from");
3342 if (m_local == from) {
3343 changeState(Starting);
3344 m_events.append(new JBEvent(JBEvent::Start,this,0,to,JabberID::empty()));
3345 }
3346 else
3347 terminate(0,false,0,XMPPError::InvalidFrom);
3348 return false;
3349 }
3350
3351 if (outgoing()) {
3352 m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
3353 return true;
3354 }
3355
3356 // Incoming stream
3357 m_local = to;
3358 if (m_local && !engine()->hasDomain(m_local)) {
3359 terminate(0,true,0,XMPPError::HostUnknown);
3360 return false;
3361 }
3362 updateFromRemoteDef();
3363 m_events.append(new JBEvent(JBEvent::Start,this,0,from,to));
3364 return true;
3365 }
3366
3367 // Process elements in Auth state
processAuth(XmlElement * xml,const JabberID & from,const JabberID & to)3368 bool JBServerStream::processAuth(XmlElement* xml, const JabberID& from,
3369 const JabberID& to)
3370 {
3371 if (incoming())
3372 return dropXml(xml,"invalid state for incoming stream");
3373 // Component
3374 if (type() == comp) {
3375 int t,n;
3376 if (!XMPPUtils::getTag(*xml,t,n))
3377 return destroyDropXml(xml,XMPPError::Internal,"failed to retrieve element tag");
3378 if (t != XmlTag::Handshake || n != m_xmlns)
3379 return dropXml(xml,"expecting handshake in stream's namespace");
3380 // Stream authenticated
3381 TelEngine::destruct(xml);
3382 setFlags(StreamAuthenticated);
3383 changeState(Running);
3384 Debug(this,DebugAll,"Authenticated [%p]",this);
3385 return true;
3386 }
3387 // Waiting for db:result
3388 if (!isDbResult(*xml))
3389 return dropXml(xml,"expecting dialback result");
3390 // Result
3391 // Outgoing stream waiting for dialback key response
3392 if (outgoing()) {
3393 if (m_remote != from || m_local != to)
3394 return destroyDropXml(xml,XMPPError::BadAddressing,
3395 "dialback response with invalid 'from'");
3396 // Expect dialback key response
3397 int rsp = XMPPUtils::decodeDbRsp(xml);
3398 if (rsp != XMPPError::NoError) {
3399 terminate(1,false,xml,rsp);
3400 return false;
3401 }
3402 // Stream authenticated
3403 TelEngine::destruct(xml);
3404 setFlags(StreamAuthenticated);
3405 // Check compression
3406 XmlElement* x = checkCompress();
3407 if (x)
3408 return sendStreamXml(Compressing,x);
3409 changeState(Running);
3410 return true;
3411 }
3412 return dropXml(xml,"incomplete state process");
3413 }
3414
3415 // Start the stream (reply to received stream start)
startComp(const String & local,const String & remote)3416 bool JBServerStream::startComp(const String& local, const String& remote)
3417 {
3418 if (state() != Starting || type() != comp)
3419 return false;
3420 Lock lock(this);
3421 XmlElement* s = 0;
3422 if (incoming()) {
3423 m_local.set(local);
3424 m_remote.set(remote);
3425 s = buildStreamStart();
3426 }
3427 else {
3428 String digest;
3429 buildSha1Digest(digest,m_password);
3430 s = XMPPUtils::createElement(XmlTag::Handshake,digest);
3431 }
3432 setSecured();
3433 return sendStreamXml(incoming() ? Features : Auth,s);
3434 }
3435
3436 // Process dialback key (db:result) requests
processDbResult(XmlElement * xml,const JabberID & from,const JabberID & to)3437 bool JBServerStream::processDbResult(XmlElement* xml, const JabberID& from,
3438 const JabberID& to)
3439 {
3440 // Check TLS when stream:features were sent
3441 if (m_state == Features) {
3442 if (flag(TlsRequired) && !flag(StreamSecured))
3443 return destroyDropXml(xml,XMPPError::EncryptionRequired,
3444 "required encryption not supported by remote");
3445 // TLS can't be negotiated anymore
3446 setFlags(StreamSecured);
3447 }
3448 // Check remote domain
3449 if (!from)
3450 return destroyDropXml(xml,XMPPError::BadAddressing,
3451 "dialback result with empty 'from' domain");
3452 // Accept non empty key only
3453 const char* key = xml->getText();
3454 if (TelEngine::null(key))
3455 return destroyDropXml(xml,XMPPError::NotAcceptable,
3456 "dialback result with empty key");
3457 // Check local domain
3458 if (!(to && engine()->hasDomain(to))) {
3459 const char* reason = "dialback result with unknown 'to' domain";
3460 dropXml(xml,reason);
3461 XmlElement* rsp = XMPPUtils::createDialbackResult(to,from,XMPPError::ItemNotFound);
3462 if (m_state < Running)
3463 sendStreamXml(state(),rsp);
3464 else
3465 sendStanza(rsp);
3466 return false;
3467 }
3468 if (!m_local)
3469 m_local = to;
3470 else if (m_local != to)
3471 return destroyDropXml(xml,XMPPError::NotAcceptable,
3472 "dialback result with incorrect 'to' domain");
3473 // Ignore duplicate requests
3474 if (m_remoteDomains.getParam(from)) {
3475 dropXml(xml,"duplicate dialback key request");
3476 return false;
3477 }
3478 m_remoteDomains.addParam(from,key);
3479 DDebug(this,DebugAll,"Added db:result request from %s [%p]",from.c_str(),this);
3480 // Notify the upper layer of incoming request
3481 JBEvent* ev = new JBEvent(JBEvent::DbResult,this,xml,from,to);
3482 ev->m_text = key;
3483 m_events.append(ev);
3484 return true;
3485 }
3486
3487
3488 /*
3489 * JBClusterStream
3490 */
3491 // Build an incoming stream from a socket
JBClusterStream(JBEngine * engine,Socket * socket)3492 JBClusterStream::JBClusterStream(JBEngine* engine, Socket* socket)
3493 : JBStream(engine,socket,cluster)
3494 {
3495 }
3496
3497 // Build an outgoing stream
JBClusterStream(JBEngine * engine,const JabberID & local,const JabberID & remote,const NamedList * params)3498 JBClusterStream::JBClusterStream(JBEngine* engine, const JabberID& local,
3499 const JabberID& remote, const NamedList* params)
3500 : JBStream(engine,cluster,local,remote,0,params)
3501 {
3502 }
3503
3504 // Build a stream start XML element
buildStreamStart()3505 XmlElement* JBClusterStream::buildStreamStart()
3506 {
3507 XmlElement* start = new XmlElement(XMPPUtils::s_tag[XmlTag::Stream],false);
3508 if (incoming())
3509 start->setAttribute("id",m_id);
3510 XMPPUtils::setStreamXmlns(*start);
3511 start->setAttribute(XmlElement::s_ns,XMPPUtils::s_ns[m_xmlns]);
3512 start->setAttributeValid("from",m_local);
3513 start->setAttributeValid("to",m_remote);
3514 start->setAttribute("version","1.0");
3515 start->setAttribute("xml:lang","en");
3516 return start;
3517 }
3518
3519 // Process received elements in WaitStart state
3520 // WaitStart: Incoming: waiting for stream start
3521 // Outgoing: idem (our stream start was already sent)
3522 // Return false if stream termination was initiated
processStart(const XmlElement * xml,const JabberID & from,const JabberID & to)3523 bool JBClusterStream::processStart(const XmlElement* xml, const JabberID& from,
3524 const JabberID& to)
3525 {
3526 XDebug(this,DebugAll,"JBClusterStream::processStart() [%p]",this);
3527 if (!processStreamStart(xml))
3528 return false;
3529 // Check from/to
3530 bool ok = true;
3531 if (outgoing())
3532 ok = (m_local == to) && (m_remote == from);
3533 else {
3534 if (!m_remote) {
3535 m_local = to;
3536 m_remote = from;
3537 ok = from && to;
3538 }
3539 else
3540 ok = (m_local == to) && (m_remote == from);
3541 }
3542 if (!ok) {
3543 Debug(this,DebugNote,"Got invalid from='%s' or to='%s' in stream start [%p]",
3544 from.c_str(),to.c_str(),this);
3545 terminate(0,true,0,XMPPError::BadAddressing);
3546 return false;
3547 }
3548 m_events.append(new JBEvent(JBEvent::Start,this,0,m_remote,m_local));
3549 return true;
3550 }
3551
3552 // Process elements in Running state
processRunning(XmlElement * xml,const JabberID & from,const JabberID & to)3553 bool JBClusterStream::processRunning(XmlElement* xml, const JabberID& from, const JabberID& to)
3554 {
3555 if (!xml)
3556 return true;
3557 int t, ns;
3558 if (!XMPPUtils::getTag(*xml,t,ns))
3559 return dropXml(xml,"failed to retrieve element tag");
3560 JBEvent::Type evType = JBEvent::Unknown;
3561 XmlElement* child = 0;
3562 switch (t) {
3563 case XmlTag::Iq:
3564 checkPing(this,xml,m_pingId);
3565 evType = JBEvent::Iq;
3566 child = xml->findFirstChild();
3567 break;
3568 case XmlTag::Message:
3569 evType = JBEvent::Message;
3570 break;
3571 case XmlTag::Presence:
3572 evType = JBEvent::Presence;
3573 break;
3574 default: ;
3575 }
3576 m_events.append(new JBEvent(evType,this,xml,m_remote,m_local,child));
3577 return true;
3578 }
3579
3580 /* vi: set ts=8 sw=4 sts=4 noet: */
3581