1
2"""
3Infrastructure code for testing Gabble by pretending to be a Jabber server.
4"""
5
6import base64
7import os
8import hashlib
9import sys
10import random
11import re
12import traceback
13
14import ns
15import constants as cs
16import servicetest
17from servicetest import (
18    assertEquals, assertLength, assertContains, wrap_channel,
19    EventPattern, call_async, unwrap, Event)
20import twisted
21from twisted.words.xish import domish, xpath
22from twisted.words.protocols.jabber.client import IQ
23from twisted.words.protocols.jabber import xmlstream
24from twisted.internet import reactor, ssl
25
26import dbus
27
28def make_result_iq(stream, iq, add_query_node=True):
29    result = IQ(stream, "result")
30    result["id"] = iq["id"]
31    to = iq.getAttribute('to')
32    if to is not None:
33        result["from"] = to
34    query = iq.firstChildElement()
35
36    if query and add_query_node:
37        result.addElement((query.uri, query.name))
38
39    return result
40
41def acknowledge_iq(stream, iq):
42    stream.send(make_result_iq(stream, iq))
43
44def send_error_reply(stream, iq, error_stanza=None):
45    result = IQ(stream, "error")
46    result["id"] = iq["id"]
47    query = iq.firstChildElement()
48    to = iq.getAttribute('to')
49    if to is not None:
50        result["from"] = to
51
52    if query:
53        result.addElement((query.uri, query.name))
54
55    if error_stanza:
56        result.addChild(error_stanza)
57
58    stream.send(result)
59
60def request_muc_handle(q, conn, stream, muc_jid):
61    servicetest.call_async(q, conn, 'RequestHandles', 2, [muc_jid])
62    event = q.expect('dbus-return', method='RequestHandles')
63    return event.value[0][0]
64
65def make_muc_presence(affiliation, role, muc_jid, alias, jid=None, photo=None):
66    presence = domish.Element((None, 'presence'))
67    presence['from'] = '%s/%s' % (muc_jid, alias)
68    x = presence.addElement((ns.MUC_USER, 'x'))
69    item = x.addElement('item')
70    item['affiliation'] = affiliation
71    item['role'] = role
72    if jid is not None:
73        item['jid'] = jid
74
75    if photo is not None:
76        presence.addChild(
77            elem(ns.VCARD_TEMP_UPDATE, 'x')(
78              elem('photo')(unicode(photo))
79            ))
80
81    return presence
82
83def sync_stream(q, stream):
84    """Used to ensure that Gabble has processed all stanzas sent to it."""
85
86    iq = IQ(stream, "get")
87    id = iq['id']
88    iq.addElement(('http://jabber.org/protocol/disco#info', 'query'))
89    stream.send(iq)
90    q.expect('stream-iq', query_ns='http://jabber.org/protocol/disco#info',
91        predicate=(lambda event:
92            event.stanza['id'] == id and event.iq_type == 'result'))
93
94class GabbleAuthenticator(xmlstream.Authenticator):
95    def __init__(self, username, password, resource=None):
96        self.username = username
97        self.password = password
98        self.resource = resource
99        self.bare_jid = None
100        self.full_jid = None
101        self._event_func = lambda e: None
102        xmlstream.Authenticator.__init__(self)
103
104    def set_event_func(self, event_func):
105        self._event_func = event_func
106
107class JabberAuthenticator(GabbleAuthenticator):
108    "Trivial XML stream authenticator that accepts one username/digest pair."
109
110    # Patch in fix from http://twistedmatrix.com/trac/changeset/23418.
111    # This monkeypatch taken from Gadget source code
112    from twisted.words.xish.utility import EventDispatcher
113
114    def _addObserver(self, onetime, event, observerfn, priority, *args,
115            **kwargs):
116        if self._dispatchDepth > 0:
117            self._updateQueue.append(lambda: self._addObserver(onetime, event,
118                observerfn, priority, *args, **kwargs))
119
120        return self._oldAddObserver(onetime, event, observerfn, priority,
121            *args, **kwargs)
122
123    EventDispatcher._oldAddObserver = EventDispatcher._addObserver
124    EventDispatcher._addObserver = _addObserver
125
126    def __init__(self, username, password, resource=None, emit_events=False):
127        GabbleAuthenticator.__init__(self, username, password, resource)
128        self.emit_events = emit_events
129
130    def streamStarted(self, root=None):
131        if root:
132            self.xmlstream.sid = '%x' % random.randint(1, sys.maxint)
133            self.xmlstream.domain = root.getAttribute('to')
134
135        self.xmlstream.sendHeader()
136        self.xmlstream.addOnetimeObserver(
137            "/iq/query[@xmlns='jabber:iq:auth']", self.initialIq)
138
139    def initialIq(self, iq):
140        if self.emit_events:
141            self._event_func(Event('auth-initial-iq', authenticator=self,
142                iq=iq, id=iq["id"]))
143        else:
144            self.respondToInitialIq(iq)
145
146        self.xmlstream.addOnetimeObserver('/iq/query/username', self.secondIq)
147
148    def respondToInitialIq(self, iq):
149        result = IQ(self.xmlstream, "result")
150        result["id"] = iq["id"]
151        query = result.addElement('query')
152        query["xmlns"] = "jabber:iq:auth"
153        query.addElement('username', content='test')
154        query.addElement('password')
155        query.addElement('digest')
156        query.addElement('resource')
157        self.xmlstream.send(result)
158
159    def secondIq(self, iq):
160        if self.emit_events:
161            self._event_func(Event('auth-second-iq', authenticator=self,
162                iq=iq, id=iq["id"]))
163        else:
164            self.respondToSecondIq(iq)
165
166    def respondToSecondIq(self, iq):
167        username = xpath.queryForNodes('/iq/query/username', iq)
168        assert map(str, username) == [self.username]
169
170        digest = xpath.queryForNodes('/iq/query/digest', iq)
171        expect = hashlib.sha1(self.xmlstream.sid + self.password).hexdigest()
172        assert map(str, digest) == [expect]
173
174        resource = xpath.queryForNodes('/iq/query/resource', iq)
175        assertLength(1, resource)
176        if self.resource is not None:
177            assertEquals(self.resource, str(resource[0]))
178
179        self.bare_jid = '%s@%s' % (self.username, self.xmlstream.domain)
180        self.full_jid = '%s/%s' % (self.bare_jid, resource)
181
182        result = IQ(self.xmlstream, "result")
183        result["id"] = iq["id"]
184        self.xmlstream.send(result)
185        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
186
187class XmppAuthenticator(GabbleAuthenticator):
188    def __init__(self, username, password, resource=None):
189        GabbleAuthenticator.__init__(self, username, password, resource)
190        self.authenticated = False
191
192        self._mechanisms = ['PLAIN']
193
194    def streamInitialize(self, root):
195        if root:
196            self.xmlstream.sid = root.getAttribute('id')
197            self.xmlstream.domain = root.getAttribute('to')
198
199        if self.xmlstream.sid is None:
200            self.xmlstream.sid = '%x' % random.randint(1, sys.maxint)
201
202        self.xmlstream.sendHeader()
203
204    def streamIQ(self):
205        features = elem(xmlstream.NS_STREAMS, 'features')(
206            elem(ns.NS_XMPP_BIND, 'bind'),
207            elem(ns.NS_XMPP_SESSION, 'session'),
208        )
209        self.xmlstream.send(features)
210
211        self.xmlstream.addOnetimeObserver(
212            "/iq/bind[@xmlns='%s']" % ns.NS_XMPP_BIND, self.bindIq)
213        self.xmlstream.addOnetimeObserver(
214            "/iq/session[@xmlns='%s']" % ns.NS_XMPP_SESSION, self.sessionIq)
215
216    def streamSASL(self):
217        features = domish.Element((xmlstream.NS_STREAMS, 'features'))
218        mechanisms = features.addElement((ns.NS_XMPP_SASL, 'mechanisms'))
219        for mechanism in self._mechanisms:
220            mechanisms.addElement('mechanism', content=mechanism)
221        self.xmlstream.send(features)
222
223        self.xmlstream.addOnetimeObserver("/auth", self.auth)
224
225    def streamStarted(self, root=None):
226        self.streamInitialize(root)
227
228        if self.authenticated:
229            # Initiator authenticated itself, and has started a new stream.
230            self.streamIQ()
231        else:
232            self.streamSASL()
233
234    def auth(self, auth):
235        assert (base64.b64decode(str(auth)) ==
236            '\x00%s\x00%s' % (self.username, self.password))
237
238        success = domish.Element((ns.NS_XMPP_SASL, 'success'))
239        self.xmlstream.send(success)
240        self.xmlstream.reset()
241        self.authenticated = True
242
243    def bindIq(self, iq):
244        resource = xpath.queryForString('/iq/bind/resource', iq)
245        if self.resource is not None:
246            assertEquals(self.resource, resource)
247        else:
248            assert resource is not None
249
250        result = IQ(self.xmlstream, "result")
251        result["id"] = iq["id"]
252        bind = result.addElement((ns.NS_XMPP_BIND, 'bind'))
253        self.bare_jid = '%s@%s' % (self.username, self.xmlstream.domain)
254        self.full_jid = '%s/%s' % (self.bare_jid, resource)
255        jid = bind.addElement('jid', content=self.full_jid)
256        self.xmlstream.send(result)
257
258        self.xmlstream.dispatch(self.xmlstream, xmlstream.STREAM_AUTHD_EVENT)
259
260    def sessionIq(self, iq):
261        self.xmlstream.send(make_result_iq(self.xmlstream, iq))
262
263class StreamEvent(servicetest.Event):
264    def __init__(self, type_, stanza, stream):
265        servicetest.Event.__init__(self, type_, stanza=stanza)
266        self.stream = stream
267        self.to = stanza.getAttribute("to")
268
269class IQEvent(StreamEvent):
270    def __init__(self, stream, iq):
271        StreamEvent.__init__(self, 'stream-iq', iq, stream)
272        self.iq_type = iq.getAttribute("type")
273        self.iq_id = iq.getAttribute("id")
274
275        query = iq.firstChildElement()
276
277        if query:
278            self.query = query
279            self.query_ns = query.uri
280            self.query_name = query.name
281
282            if query.getAttribute("node"):
283                self.query_node = query.getAttribute("node")
284        else:
285            self.query = None
286
287class PresenceEvent(StreamEvent):
288    def __init__(self, stream, stanza):
289        StreamEvent.__init__(self, 'stream-presence', stanza, stream)
290        self.presence_type = stanza.getAttribute('type')
291
292        statuses = xpath.queryForNodes('/presence/status', stanza)
293
294        if statuses:
295            self.presence_status = str(statuses[0])
296
297class MessageEvent(StreamEvent):
298    def __init__(self, stream, stanza):
299        StreamEvent.__init__(self, 'stream-message', stanza, stream)
300        self.message_type = stanza.getAttribute('type')
301
302class StreamFactory(twisted.internet.protocol.Factory):
303    def __init__(self, streams, jids):
304        self.streams = streams
305        self.jids = jids
306        self.presences = {}
307        self.mappings = dict(map (lambda jid, stream: (jid, stream),
308                                  jids, streams))
309
310        # Make a copy of the streams
311        self.factory_streams = list(streams)
312        self.factory_streams.reverse()
313
314        # Do not add observers for single instances because it's unnecessary and
315        # some unit tests need to respond to the roster request, and we shouldn't
316        # answer it for them otherwise we break compatibility
317        if len(streams) > 1:
318            # We need to have a function here because lambda keeps a reference on
319            # the stream and jid and in the for loop, there is no context
320            def addObservers(stream, jid):
321                stream.addObserver('/iq', lambda x: \
322                                       self.forward_iq(stream, jid, x))
323                stream.addObserver('/presence', lambda x: \
324                                       self.got_presence(stream, jid, x))
325
326            for (jid, stream) in self.mappings.items():
327                addObservers(stream, jid)
328
329    def protocol(self, *args):
330        return self.factory_streams.pop()
331
332
333    def got_presence (self, stream, jid, stanza):
334        stanza.attributes['from'] = jid
335        self.presences[jid] = stanza
336
337        for dest_jid  in self.presences.keys():
338            # Dispatch the new presence to other clients
339            stanza.attributes['to'] = dest_jid
340            self.mappings[dest_jid].send(stanza)
341
342            # Don't echo the presence twice
343            if dest_jid != jid:
344                # Dispatch other client's presence to this stream
345                presence = self.presences[dest_jid]
346                presence.attributes['to'] = jid
347                stream.send(presence)
348
349    def lost_presence(self, stream, jid):
350        if self.presences.has_key(jid):
351            del self.presences[jid]
352            for dest_jid  in self.presences.keys():
353                presence = domish.Element(('jabber:client', 'presence'))
354                presence['from'] = jid
355                presence['to'] = dest_jid
356                presence['type'] = 'unavailable'
357                self.mappings[dest_jid].send(presence)
358
359    def forward_iq(self, stream, jid, stanza):
360        stanza.attributes['from'] = jid
361
362        query = stanza.firstChildElement()
363
364        # Fake other accounts as being part of our roster
365        if query and query.uri == ns.ROSTER:
366            roster = make_result_iq(stream, stanza)
367            query = roster.firstChildElement()
368            for roster_jid in self.mappings.keys():
369                if jid != roster_jid:
370                    item = query.addElement('item')
371                    item['jid'] = roster_jid
372                    item['subscription'] = 'both'
373            stream.send(roster)
374            return
375
376        to = stanza.getAttribute('to')
377        dest = None
378        if to is not None:
379            dest = self.mappings.get(to)
380
381        if dest is not None:
382            dest.send(stanza)
383
384class BaseXmlStream(xmlstream.XmlStream):
385    initiating = False
386    namespace = 'jabber:client'
387    pep_support = True
388    disco_features = []
389    handle_privacy_lists = True
390
391    def __init__(self, event_func, authenticator):
392        xmlstream.XmlStream.__init__(self, authenticator)
393        self.event_func = event_func
394        self.addObserver('//iq', lambda x: event_func(
395            IQEvent(self, x)))
396        self.addObserver('//message', lambda x: event_func(
397            MessageEvent(self, x)))
398        self.addObserver('//presence', lambda x: event_func(
399            PresenceEvent(self, x)))
400        self.addObserver('//event/stream/authd', self._cb_authd)
401        if self.handle_privacy_lists:
402            self.addObserver("/iq/query[@xmlns='%s']" % ns.PRIVACY,
403                             self._cb_priv_list)
404
405    def connectionMade(self):
406        xmlstream.XmlStream.connectionMade(self)
407
408        if 'GABBLE_NODELAY' in os.environ:
409            self.transport.setTcpNoDelay(True)
410
411    def _cb_priv_list(self, iq):
412        send_error_reply(self, iq)
413
414    def _cb_authd(self, _):
415        # called when stream is authenticated
416        assert self.authenticator.full_jid is not None
417        assert self.authenticator.bare_jid is not None
418
419        self.addObserver(
420            "/iq[@to='%s']/query[@xmlns='http://jabber.org/protocol/disco#info']" % self.domain,
421            self._cb_disco_iq)
422        self.addObserver(
423            "/iq[@to='%s']/query[@xmlns='http://jabber.org/protocol/disco#info']"
424                % self.authenticator.bare_jid,
425            self._cb_bare_jid_disco_iq)
426        self.event_func(servicetest.Event('stream-authenticated'))
427
428    def _cb_disco_iq(self, iq):
429        nodes = xpath.queryForNodes(
430            "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']", iq)
431        query = nodes[0]
432
433        for feature in self.disco_features:
434            query.addChild(elem('feature', var=feature))
435
436        iq['type'] = 'result'
437        iq['from'] = iq['to']
438        self.send(iq)
439
440    def _cb_bare_jid_disco_iq(self, iq):
441        # advertise PEP support
442        nodes = xpath.queryForNodes(
443            "/iq/query[@xmlns='http://jabber.org/protocol/disco#info']",
444            iq)
445        query = nodes[0]
446        identity = query.addElement('identity')
447        identity['category'] = 'pubsub'
448        identity['type'] = 'pep'
449
450        iq['type'] = 'result'
451        iq['from'] = iq['to']
452        self.send(iq)
453
454    def onDocumentEnd(self):
455        self.event_func(servicetest.Event('stream-closed'))
456        # We don't chain up XmlStream.onDocumentEnd() because it will
457        # disconnect the TCP connection making tests as
458        # connect/disconnect-timeout.py not working
459
460    def connectionLost(self, reason):
461        self.event_func(servicetest.Event('stream-connection-lost'))
462        xmlstream.XmlStream.connectionLost(self, reason)
463
464    def send_stream_error(self, error='system-shutdown'):
465        # Yes, there are meant to be two different STREAMS namespaces.
466        go_away = \
467            elem(xmlstream.NS_STREAMS, 'error')(
468                elem(ns.STREAMS, error)
469            )
470
471        self.send(go_away)
472
473class JabberXmlStream(BaseXmlStream):
474    version = (0, 9)
475
476class XmppXmlStream(BaseXmlStream):
477    version = (1, 0)
478
479class GoogleXmlStream(BaseXmlStream):
480    version = (1, 0)
481
482    pep_support = False
483    disco_features = [ns.GOOGLE_ROSTER,
484                      ns.GOOGLE_JINGLE_INFO,
485                      ns.GOOGLE_MAIL_NOTIFY,
486                      ns.GOOGLE_QUEUE,
487                     ]
488
489    def _cb_bare_jid_disco_iq(self, iq):
490        # Google talk doesn't support PEP :(
491        iq['type'] = 'result'
492        iq['from'] = iq['to']
493        self.send(iq)
494
495
496def make_connection(bus, event_func, params=None, suffix=''):
497    # Gabble accepts a resource in 'account', but the value of 'resource'
498    # overrides it if there is one.
499    test_name = re.sub('(.*tests/twisted/|\./)', '',  sys.argv[0])
500    account = 'test%s@localhost/%s' % (suffix, test_name)
501
502    default_params = {
503        'account': account,
504        'password': 'pass',
505        'resource': 'Resource',
506        'server': 'localhost',
507        'port': dbus.UInt32(4242),
508        'fallback-socks5-proxies': dbus.Array([], signature='s'),
509        'require-encryption': False,
510        }
511
512    if params:
513        default_params.update(params)
514
515     # Allow omitting the 'password' param
516    if default_params['password'] is None:
517        del default_params['password']
518
519     # Allow omitting the 'account' param
520    if default_params['account'] is None:
521        del default_params['account']
522
523    jid = default_params.get('account', None)
524    conn =  servicetest.make_connection(bus, event_func, 'gabble', 'jabber',
525                                        default_params)
526    return (conn, jid)
527
528def make_stream(event_func, authenticator=None, protocol=None,
529                resource=None, suffix=''):
530    # set up Jabber server
531    if authenticator is None:
532        authenticator = XmppAuthenticator('test%s' % suffix, 'pass', resource=resource)
533
534    authenticator.set_event_func(event_func)
535
536    if protocol is None:
537        protocol = XmppXmlStream
538
539    stream = protocol(event_func, authenticator)
540    return stream
541
542def disconnect_conn(q, conn, stream, expected_before=[], expected_after=[]):
543    call_async(q, conn, 'Disconnect')
544
545    tmp = expected_before + [
546        EventPattern('dbus-signal', signal='StatusChanged', args=[cs.CONN_STATUS_DISCONNECTED, cs.CSR_REQUESTED]),
547        EventPattern('stream-closed')]
548
549    before_events = q.expect_many(*tmp)
550
551    stream.sendFooter()
552
553    tmp = expected_after + [EventPattern('dbus-return', method='Disconnect')]
554    after_events = q.expect_many(*tmp)
555
556    return before_events[:-2], after_events[:-1]
557
558def element_repr(element):
559    """__repr__ cannot safely return non-ASCII: see
560    <http://bugs.python.org/issue5876>. So we print non-ASCII characters as
561    \uXXXX escapes in debug output
562
563    """
564    return element.toXml().encode('unicode-escape')
565
566def expect_connected(queue):
567    queue.expect('dbus-signal', signal='StatusChanged',
568        args=[cs.CONN_STATUS_CONNECTING, cs.CSR_REQUESTED])
569    queue.expect('stream-authenticated')
570    queue.expect('dbus-signal', signal='PresencesChanged',
571        args=[{1L: (cs.PRESENCE_AVAILABLE, u'available', '')}])
572    queue.expect('dbus-signal', signal='StatusChanged',
573        args=[cs.CONN_STATUS_CONNECTED, cs.CSR_REQUESTED])
574
575def exec_test_deferred(fun, params, protocol=None, timeout=None,
576                        authenticator=None, num_instances=1,
577                        do_connect=True,
578                        make_connection_func=make_connection,
579                        expect_connected_func=expect_connected):
580    # hack to ease debugging
581    domish.Element.__repr__ = element_repr
582    colourer = None
583
584    if sys.stdout.isatty() or 'CHECK_FORCE_COLOR' in os.environ:
585        colourer = servicetest.install_colourer()
586
587    try:
588        bus = dbus.SessionBus()
589    except dbus.exceptions.DBusException as e:
590        print e
591        os._exit(1)
592
593    queue = servicetest.IteratingEventQueue(timeout)
594    queue.verbose = (
595        os.environ.get('CHECK_TWISTED_VERBOSE', '') != ''
596        or '-v' in sys.argv)
597
598    conns = []
599    jids = []
600    streams = []
601    resource = params.get('resource') if params is not None else None
602    for i in range(0, num_instances):
603        if i == 0:
604            suffix = ''
605        else:
606            suffix = str(i)
607
608        try:
609            (conn, jid) = make_connection_func(bus, queue.append, params, suffix)
610        except Exception, e:
611            # Crap. This is normally because the connection's still kicking
612            # around on the bus. Let's bin any connections we *did* manage to
613            # get going and then bail out unceremoniously.
614            print e
615
616            for conn in conns:
617                conn.Disconnect()
618
619            os._exit(1)
620
621        conns.append(conn)
622        jids.append(jid)
623        streams.append(make_stream(queue.append, protocol=protocol,
624                                   authenticator=authenticator,
625                                   resource=resource, suffix=suffix))
626
627    factory = StreamFactory(streams, jids)
628    port = reactor.listenTCP(4242, factory, interface='localhost')
629
630    def signal_receiver(*args, **kw):
631        if kw['path'] == '/org/freedesktop/DBus' and \
632                kw['member'] == 'NameOwnerChanged':
633            bus_name, old_name, new_name = args
634            if new_name == '':
635                for i, conn in enumerate(conns):
636                    stream = streams[i]
637                    jid = jids[i]
638                    if conn._requested_bus_name == bus_name:
639                        factory.lost_presence(stream, jid)
640                        break
641        queue.append(Event('dbus-signal',
642                           path=unwrap(kw['path']),
643                           signal=kw['member'], args=map(unwrap, args),
644                           interface=kw['interface']))
645
646    match_all_signals = bus.add_signal_receiver(
647        signal_receiver,
648        None,       # signal name
649        None,       # interface
650        None,
651        path_keyword='path',
652        member_keyword='member',
653        interface_keyword='interface',
654        byte_arrays=True
655        )
656
657    error = None
658
659    try:
660        if do_connect:
661            for conn in conns:
662                conn.Connect()
663                expect_connected_func(queue)
664
665        if len(conns) == 1:
666            fun(queue, bus, conns[0], streams[0])
667        else:
668            fun(queue, bus, conns, streams)
669    except Exception, e:
670        traceback.print_exc()
671        error = e
672        queue.verbose = False
673
674    if colourer:
675        sys.stdout = colourer.fh
676
677    d = port.stopListening()
678
679    # Does the Connection object still exist?
680    for i, conn in enumerate(conns):
681        if not bus.name_has_owner(conn.object.bus_name):
682            # Connection has already been disconnected and destroyed
683            continue
684        try:
685            if conn.GetStatus() == cs.CONN_STATUS_CONNECTED:
686                # Connection is connected, properly disconnect it
687                disconnect_conn(queue, conn, streams[i])
688            else:
689                # Connection is not connected, call Disconnect() to destroy it
690                conn.Disconnect()
691        except dbus.DBusException, e:
692            pass
693        except Exception, e:
694            traceback.print_exc()
695            error = e
696
697        try:
698            conn.Disconnect()
699            raise AssertionError("Connection didn't disappear; "
700                "all subsequent tests will probably fail")
701        except dbus.DBusException, e:
702            pass
703        except Exception, e:
704            traceback.print_exc()
705            error = e
706
707    match_all_signals.remove()
708
709    if error is None:
710        d.addBoth((lambda *args: reactor.crash()))
711    else:
712        # please ignore the POSIX behind the curtain
713        d.addBoth((lambda *args: os._exit(1)))
714
715
716def exec_test(fun, params=None, protocol=None, timeout=None,
717              authenticator=None, num_instances=1, do_connect=True):
718    reactor.callWhenRunning(
719        exec_test_deferred, fun, params, protocol, timeout, authenticator, num_instances,
720        do_connect)
721    reactor.run()
722
723# Useful routines for server-side vCard handling
724current_vcard = domish.Element(('vcard-temp', 'vCard'))
725
726def expect_and_handle_get_vcard(q, stream):
727    get_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
728        query_name='vCard', iq_type='get')
729
730    iq = get_vcard_event.stanza
731    vcard = iq.firstChildElement()
732    assert vcard.name == 'vCard', vcard.toXml()
733
734    # Send back current vCard
735    result = make_result_iq(stream, iq, add_query_node=False)
736    result.addChild(current_vcard)
737    stream.send(result)
738
739def expect_and_handle_set_vcard(q, stream, check=None):
740    global current_vcard
741    set_vcard_event = q.expect('stream-iq', query_ns=ns.VCARD_TEMP,
742        query_name='vCard', iq_type='set')
743    iq = set_vcard_event.stanza
744    vcard = iq.firstChildElement()
745    assert vcard.name == 'vCard', vcard.toXml()
746
747    if check is not None:
748        check(vcard)
749
750    # Update current vCard
751    current_vcard = vcard
752
753    stream.send(make_result_iq(stream, iq))
754
755def _elem_add(elem, *children):
756    for child in children:
757        if isinstance(child, domish.Element):
758            elem.addChild(child)
759        elif isinstance(child, unicode):
760            elem.addContent(child)
761        else:
762            raise ValueError(
763                'invalid child object %r (must be element or unicode)', child)
764
765def elem(a, b=None, attrs={}, **kw):
766    r"""
767    >>> elem('foo')().toXml()
768    u'<foo/>'
769    >>> elem('foo', x='1')().toXml()
770    u"<foo x='1'/>"
771    >>> elem('foo', x='1')(u'hello').toXml()
772    u"<foo x='1'>hello</foo>"
773    >>> elem('foo', x='1')(u'hello',
774    ...         elem('http://foo.org', 'bar', y='2')(u'bye')).toXml()
775    u"<foo x='1'>hello<bar xmlns='http://foo.org' y='2'>bye</bar></foo>"
776    >>> elem('foo', attrs={'xmlns:bar': 'urn:bar', 'bar:cake': 'yum'})(
777    ...   elem('bar:e')(u'i')
778    ... ).toXml()
779    u"<foo xmlns:bar='urn:bar' bar:cake='yum'><bar:e>i</bar:e></foo>"
780    """
781
782    class _elem(domish.Element):
783        def __call__(self, *children):
784            _elem_add(self, *children)
785            return self
786
787    if b is not None:
788        elem = _elem((a, b))
789    else:
790        elem = _elem((None, a))
791
792    # Can't just update kw into attrs, because that *modifies the parameter's
793    # default*. Thanks python.
794    allattrs = {}
795    allattrs.update(kw)
796    allattrs.update(attrs)
797
798    # First, let's pull namespaces out
799    realattrs = {}
800    for k, v in allattrs.iteritems():
801        if k.startswith('xmlns:'):
802            abbr = k[len('xmlns:'):]
803            elem.localPrefixes[abbr] = v
804        else:
805            realattrs[k] = v
806
807    for k, v in realattrs.iteritems():
808        if k == 'from_':
809            elem['from'] = v
810        else:
811            elem[k] = v
812
813    return elem
814
815def elem_iq(server, type, **kw):
816    class _iq(IQ):
817        def __call__(self, *children):
818            _elem_add(self, *children)
819            return self
820
821    iq = _iq(server, type)
822
823    for k, v in kw.iteritems():
824        if k == 'from_':
825            iq['from'] = v
826        else:
827            iq[k] = v
828
829    return iq
830
831def make_presence(_from, to='test@localhost', type=None, show=None,
832        status=None, caps=None, photo=None):
833    presence = domish.Element((None, 'presence'))
834    presence['from'] = _from
835    presence['to'] = to
836
837    if type is not None:
838        presence['type'] = type
839
840    if show is not None:
841        presence.addElement('show', content=show)
842
843    if status is not None:
844        presence.addElement('status', content=status)
845
846    if caps is not None:
847        cel = presence.addElement(('http://jabber.org/protocol/caps', 'c'))
848        for key,value in caps.items():
849            cel[key] = value
850
851    # <x xmlns="vcard-temp:x:update"><photo>4a1...</photo></x>
852    if photo is not None:
853        x = presence.addElement((ns.VCARD_TEMP_UPDATE, 'x'))
854        x.addElement('photo').addContent(photo)
855
856    return presence
857