1# wpa_supplicant D-Bus interface tests
2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import binascii
8import logging
9logger = logging.getLogger()
10import subprocess
11import time
12import shutil
13import struct
14import sys
15
16try:
17    if sys.version_info[0] > 2:
18        from gi.repository import GObject as gobject
19    else:
20        import gobject
21    import dbus
22    dbus_imported = True
23except ImportError:
24    dbus_imported = False
25
26import hostapd
27from wpasupplicant import WpaSupplicant
28from utils import *
29from p2p_utils import *
30from test_ap_tdls import connect_2sta_open
31from test_ap_eap import check_altsubject_match_support
32from test_nfc_p2p import set_ip_addr_info
33from test_wpas_mesh import check_mesh_support, add_open_mesh_network
34
35WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1"
36WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1"
37WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface"
38WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS"
39WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network"
40WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS"
41WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice"
42WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer"
43WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group"
44WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup"
45WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh"
46
47def prepare_dbus(dev):
48    if not dbus_imported:
49        logger.info("No dbus module available")
50        raise HwsimSkip("No dbus module available")
51    try:
52        from dbus.mainloop.glib import DBusGMainLoop
53        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
54        bus = dbus.SystemBus()
55        wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
56        wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
57        path = wpas.GetInterface(dev.ifname)
58        if_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
59        return (bus, wpas_obj, path, if_obj)
60    except Exception as e:
61        raise HwsimSkip("Could not connect to D-Bus: %s" % e)
62
63class TestDbus(object):
64    def __init__(self, bus):
65        self.loop = gobject.MainLoop()
66        self.signals = []
67        self.bus = bus
68
69    def __exit__(self, type, value, traceback):
70        for s in self.signals:
71            s.remove()
72
73    def add_signal(self, handler, interface, name, byte_arrays=False):
74        s = self.bus.add_signal_receiver(handler, dbus_interface=interface,
75                                         signal_name=name,
76                                         byte_arrays=byte_arrays)
77        self.signals.append(s)
78
79    def timeout(self, *args):
80        logger.debug("timeout")
81        self.loop.quit()
82        return False
83
84class alloc_fail_dbus(object):
85    def __init__(self, dev, count, funcs, operation="Operation",
86                 expected="NoMemory"):
87        self._dev = dev
88        self._count = count
89        self._funcs = funcs
90        self._operation = operation
91        self._expected = expected
92    def __enter__(self):
93        cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs)
94        if "OK" not in self._dev.request(cmd):
95            raise HwsimSkip("TEST_ALLOC_FAIL not supported")
96    def __exit__(self, type, value, traceback):
97        if type is None:
98            raise Exception("%s succeeded during out-of-memory" % self._operation)
99        if type == dbus.exceptions.DBusException and self._expected in str(value):
100            return True
101        if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs:
102            raise Exception("%s did not trigger allocation failure" % self._operation)
103        return False
104
105def start_ap(ap, ssid="test-wps",
106             ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"):
107    params = {"ssid": ssid, "eap_server": "1", "wps_state": "2",
108              "wpa_passphrase": "12345678", "wpa": "2",
109              "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP",
110              "ap_pin": "12345670", "uuid": ap_uuid}
111    return hostapd.add_ap(ap, params)
112
113def test_dbus_getall(dev, apdev):
114    """D-Bus GetAll"""
115    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
116
117    props = wpas_obj.GetAll(WPAS_DBUS_SERVICE,
118                            dbus_interface=dbus.PROPERTIES_IFACE)
119    logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props))
120
121    props = if_obj.GetAll(WPAS_DBUS_IFACE,
122                          dbus_interface=dbus.PROPERTIES_IFACE)
123    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props)))
124
125    props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS,
126                          dbus_interface=dbus.PROPERTIES_IFACE)
127    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props)))
128
129    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
130                     dbus_interface=dbus.PROPERTIES_IFACE)
131    if len(res) != 0:
132        raise Exception("Unexpected BSSs entry: " + str(res))
133
134    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
135                     dbus_interface=dbus.PROPERTIES_IFACE)
136    if len(res) != 0:
137        raise Exception("Unexpected Networks entry: " + str(res))
138
139    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
140    bssid = apdev[0]['bssid']
141    dev[0].scan_for_bss(bssid, freq=2412)
142    id = dev[0].add_network()
143    dev[0].set_network(id, "disabled", "0")
144    dev[0].set_network_quoted(id, "ssid", "test")
145
146    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
147                     dbus_interface=dbus.PROPERTIES_IFACE)
148    if len(res) != 1:
149        raise Exception("Missing BSSs entry: " + str(res))
150    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
151    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
152    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
153    bssid_str = ''
154    for item in props['BSSID']:
155        if len(bssid_str) > 0:
156            bssid_str += ':'
157        bssid_str += '%02x' % item
158    if bssid_str != bssid:
159        raise Exception("Unexpected BSSID in BSSs entry")
160
161    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
162                     dbus_interface=dbus.PROPERTIES_IFACE)
163    if len(res) != 1:
164        raise Exception("Missing Networks entry: " + str(res))
165    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
166    props = net_obj.GetAll(WPAS_DBUS_NETWORK,
167                           dbus_interface=dbus.PROPERTIES_IFACE)
168    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props)))
169    ssid = props['Properties']['ssid']
170    if ssid != '"test"':
171        raise Exception("Unexpected SSID in network entry")
172
173def test_dbus_getall_oom(dev, apdev):
174    """D-Bus GetAll wpa_config_get_all() OOM"""
175    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
176
177    id = dev[0].add_network()
178    dev[0].set_network(id, "disabled", "0")
179    dev[0].set_network_quoted(id, "ssid", "test")
180
181    res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
182                     dbus_interface=dbus.PROPERTIES_IFACE)
183    if len(res) != 1:
184        raise Exception("Missing Networks entry: " + str(res))
185    net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
186    for i in range(1, 50):
187        with alloc_fail(dev[0], i, "wpa_config_get_all"):
188            try:
189                props = net_obj.GetAll(WPAS_DBUS_NETWORK,
190                                       dbus_interface=dbus.PROPERTIES_IFACE)
191            except dbus.exceptions.DBusException as e:
192                pass
193
194def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False):
195    val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop,
196                       dbus_interface=dbus.PROPERTIES_IFACE,
197                       byte_arrays=byte_arrays)
198    if expect is not None and val != expect:
199        raise Exception("Unexpected %s: %s (expected: %s)" %
200                        (prop, str(val), str(expect)))
201    return val
202
203def dbus_set(dbus, wpas_obj, prop, val):
204    wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val,
205                 dbus_interface=dbus.PROPERTIES_IFACE)
206
207def test_dbus_properties(dev, apdev):
208    """D-Bus Get/Set fi.w1.wpa_supplicant1 properties"""
209    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
210
211    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
212    dbus_set(dbus, wpas_obj, "DebugLevel", "debug")
213    dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug")
214    for (val, err) in [(3, "Error.Failed: wrong property type"),
215                       ("foo", "Error.Failed: wrong debug level value")]:
216        try:
217            dbus_set(dbus, wpas_obj, "DebugLevel", val)
218            raise Exception("Invalid DebugLevel value accepted: " + str(val))
219        except dbus.exceptions.DBusException as e:
220            if err not in str(e):
221                raise Exception("Unexpected error message: " + str(e))
222    dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump")
223    dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump")
224
225    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
226    dbus_set(dbus, wpas_obj, "DebugTimestamp", False)
227    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False)
228    try:
229        dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo")
230        raise Exception("Invalid DebugTimestamp value accepted")
231    except dbus.exceptions.DBusException as e:
232        if "Error.Failed: wrong property type" not in str(e):
233            raise Exception("Unexpected error message: " + str(e))
234    dbus_set(dbus, wpas_obj, "DebugTimestamp", True)
235    dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True)
236
237    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
238    dbus_set(dbus, wpas_obj, "DebugShowKeys", False)
239    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False)
240    try:
241        dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo")
242        raise Exception("Invalid DebugShowKeys value accepted")
243    except dbus.exceptions.DBusException as e:
244        if "Error.Failed: wrong property type" not in str(e):
245            raise Exception("Unexpected error message: " + str(e))
246    dbus_set(dbus, wpas_obj, "DebugShowKeys", True)
247    dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True)
248
249    res = dbus_get(dbus, wpas_obj, "Interfaces")
250    if len(res) != 1:
251        raise Exception("Unexpected Interfaces value: " + str(res))
252
253    res = dbus_get(dbus, wpas_obj, "EapMethods")
254    if len(res) < 5 or "TTLS" not in res:
255        raise Exception("Unexpected EapMethods value: " + str(res))
256
257    res = dbus_get(dbus, wpas_obj, "Capabilities")
258    if len(res) < 2 or "p2p" not in res:
259        raise Exception("Unexpected Capabilities value: " + str(res))
260
261    dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
262    val = binascii.unhexlify("010006020304050608")
263    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
264    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
265    if val != res:
266        raise Exception("WFDIEs value changed")
267    try:
268        dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00'))
269        raise Exception("Invalid WFDIEs value accepted")
270    except dbus.exceptions.DBusException as e:
271        if "InvalidArgs" not in str(e):
272            raise Exception("Unexpected error message: " + str(e))
273    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
274    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val))
275    dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b''))
276    res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True)
277    if len(res) != 0:
278        raise Exception("WFDIEs not cleared properly")
279
280    res = dbus_get(dbus, wpas_obj, "EapMethods")
281    try:
282        dbus_set(dbus, wpas_obj, "EapMethods", res)
283        raise Exception("Invalid Set accepted")
284    except dbus.exceptions.DBusException as e:
285        if "InvalidArgs: Property is read-only" not in str(e):
286            raise Exception("Unexpected error message: " + str(e))
287
288    try:
289        wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True,
290                        dbus_interface=dbus.PROPERTIES_IFACE)
291        raise Exception("Unknown method accepted")
292    except dbus.exceptions.DBusException as e:
293        if "UnknownMethod" not in str(e):
294            raise Exception("Unexpected error message: " + str(e))
295
296    try:
297        wpas_obj.Get("foo", "DebugShowKeys",
298                     dbus_interface=dbus.PROPERTIES_IFACE)
299        raise Exception("Invalid Get accepted")
300    except dbus.exceptions.DBusException as e:
301        if "InvalidArgs: No such property" not in str(e):
302            raise Exception("Unexpected error message: " + str(e))
303
304    test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH,
305                              introspect=False)
306    try:
307        test_obj.Get(123, "DebugShowKeys",
308                     dbus_interface=dbus.PROPERTIES_IFACE)
309        raise Exception("Invalid Get accepted")
310    except dbus.exceptions.DBusException as e:
311        if "InvalidArgs: Invalid arguments" not in str(e):
312            raise Exception("Unexpected error message: " + str(e))
313    try:
314        test_obj.Get(WPAS_DBUS_SERVICE, 123,
315                     dbus_interface=dbus.PROPERTIES_IFACE)
316        raise Exception("Invalid Get accepted")
317    except dbus.exceptions.DBusException as e:
318        if "InvalidArgs: Invalid arguments" not in str(e):
319            raise Exception("Unexpected error message: " + str(e))
320
321    try:
322        wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs",
323                     dbus.ByteArray(b'', variant_level=2),
324                     dbus_interface=dbus.PROPERTIES_IFACE)
325        raise Exception("Invalid Set accepted")
326    except dbus.exceptions.DBusException as e:
327        if "InvalidArgs: invalid message format" not in str(e):
328            raise Exception("Unexpected error message: " + str(e))
329
330def test_dbus_set_global_properties(dev, apdev):
331    """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties"""
332    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
333
334    dev[0].set("model_name", "")
335    props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')]
336
337    for p in props:
338        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
339                         dbus_interface=dbus.PROPERTIES_IFACE)
340        if res != p[1]:
341            raise Exception("Unexpected " + p[0] + " value: " + str(res))
342
343        if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2],
344                   dbus_interface=dbus.PROPERTIES_IFACE)
345
346        res = if_obj.Get(WPAS_DBUS_IFACE, p[0],
347                         dbus_interface=dbus.PROPERTIES_IFACE)
348        if res != p[2]:
349            raise Exception("Unexpected " + p[0] + " value after set: " + str(res))
350    dev[0].set("model_name", "")
351
352def test_dbus_invalid_method(dev, apdev):
353    """D-Bus invalid method"""
354    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
355    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
356
357    try:
358        wps.Foo()
359        raise Exception("Unknown method accepted")
360    except dbus.exceptions.DBusException as e:
361        if "UnknownMethod" not in str(e):
362            raise Exception("Unexpected error message: " + str(e))
363
364    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
365    test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS)
366    try:
367        test_wps.Start(123)
368        raise Exception("WPS.Start with incorrect signature accepted")
369    except dbus.exceptions.DBusException as e:
370        if "InvalidArgs: Invalid arg" not in str(e):
371            raise Exception("Unexpected error message: " + str(e))
372
373def test_dbus_get_set_wps(dev, apdev):
374    """D-Bus Get/Set for WPS properties"""
375    try:
376        _test_dbus_get_set_wps(dev, apdev)
377    finally:
378        dev[0].request("SET wps_cred_processing 0")
379        dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps")
380        dev[0].set("device_name", "Device A")
381        dev[0].set("manufacturer", "")
382        dev[0].set("model_name", "")
383        dev[0].set("model_number", "")
384        dev[0].set("serial_number", "")
385        dev[0].set("device_type", "0-00000000-0")
386
387def _test_dbus_get_set_wps(dev, apdev):
388    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
389
390    if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
391               dbus_interface=dbus.PROPERTIES_IFACE)
392
393    val = "display keypad virtual_display nfc_interface"
394    dev[0].request("SET config_methods " + val)
395
396    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
397                        dbus_interface=dbus.PROPERTIES_IFACE)
398    if config != val:
399        raise Exception("Unexpected Get(ConfigMethods) result: " + config)
400
401    val2 = "push_button display"
402    if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
403               dbus_interface=dbus.PROPERTIES_IFACE)
404    config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods",
405                        dbus_interface=dbus.PROPERTIES_IFACE)
406    if config != val2:
407        raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config)
408
409    dev[0].request("SET config_methods " + val)
410
411    for i in range(3):
412        dev[0].request("SET wps_cred_processing " + str(i))
413        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
414                         dbus_interface=dbus.PROPERTIES_IFACE)
415        expected_val = False if i == 1 else True
416        if val != expected_val:
417            raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val))
418
419    tests = [("device_name", "DeviceName"),
420             ("manufacturer", "Manufacturer"),
421             ("model_name", "ModelName"),
422             ("model_number", "ModelNumber"),
423             ("serial_number", "SerialNumber")]
424
425    for f1, f2 in tests:
426        val2 = "test-value-test"
427        dev[0].set(f1, val2)
428        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
429                         dbus_interface=dbus.PROPERTIES_IFACE)
430        if val != val2:
431            raise Exception("Get(%s) returned unexpected value" % f2)
432        val2 = "TEST-value"
433        if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2,
434                   dbus_interface=dbus.PROPERTIES_IFACE)
435        val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2,
436                         dbus_interface=dbus.PROPERTIES_IFACE)
437        if val != val2:
438            raise Exception("Get(%s) returned unexpected value after Set" % f2)
439
440    dev[0].set("device_type", "5-0050F204-1")
441    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
442                     dbus_interface=dbus.PROPERTIES_IFACE)
443    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
444        raise Exception("DeviceType mismatch")
445    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val,
446               dbus_interface=dbus.PROPERTIES_IFACE)
447    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
448                     dbus_interface=dbus.PROPERTIES_IFACE)
449    if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01:
450        raise Exception("DeviceType mismatch after Set")
451
452    val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08'
453    if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2),
454               dbus_interface=dbus.PROPERTIES_IFACE)
455    val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType",
456                     dbus_interface=dbus.PROPERTIES_IFACE,
457                     byte_arrays=True)
458    if val != val2:
459        raise Exception("DeviceType mismatch after Set (2)")
460
461    class TestDbusGetSet(TestDbus):
462        def __init__(self, bus):
463            TestDbus.__init__(self, bus)
464            self.signal_received = False
465            self.signal_received_deprecated = False
466            self.sets_done = False
467
468        def __enter__(self):
469            gobject.timeout_add(1, self.run_sets)
470            gobject.timeout_add(1000, self.timeout)
471            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS,
472                            "PropertiesChanged")
473            self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE,
474                            "PropertiesChanged")
475            self.loop.run()
476            return self
477
478        def propertiesChanged(self, properties):
479            logger.debug("PropertiesChanged: " + str(properties))
480            if "ProcessCredentials" in properties:
481                self.signal_received_deprecated = True
482                if self.sets_done and self.signal_received:
483                    self.loop.quit()
484
485        def propertiesChanged2(self, interface_name, changed_properties,
486                               invalidated_properties):
487            logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
488            if interface_name != WPAS_DBUS_IFACE_WPS:
489                return
490            if "ProcessCredentials" in changed_properties:
491                self.signal_received = True
492                if self.sets_done and self.signal_received_deprecated:
493                    self.loop.quit()
494
495        def run_sets(self, *args):
496            logger.debug("run_sets")
497            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
498                       dbus.Boolean(1),
499                       dbus_interface=dbus.PROPERTIES_IFACE)
500            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
501                          dbus_interface=dbus.PROPERTIES_IFACE) != True:
502                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
503            if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
504                       dbus.Boolean(0),
505                       dbus_interface=dbus.PROPERTIES_IFACE)
506            if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials",
507                          dbus_interface=dbus.PROPERTIES_IFACE) != False:
508                raise Exception("Unexpected Get(ProcessCredentials) result after Set")
509
510            self.dbus_sets_done = True
511            return False
512
513        def success(self):
514            return self.signal_received and self.signal_received_deprecated
515
516    with TestDbusGetSet(bus) as t:
517        if not t.success():
518            raise Exception("No signal received for ProcessCredentials change")
519
520def test_dbus_wps_invalid(dev, apdev):
521    """D-Bus invaldi WPS operation"""
522    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
523    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
524
525    failures = [{'Role': 'foo', 'Type': 'pbc'},
526                {'Role': 123, 'Type': 'pbc'},
527                {'Type': 'pbc'},
528                {'Role': 'enrollee'},
529                {'Role': 'registrar'},
530                {'Role': 'enrollee', 'Type': 123},
531                {'Role': 'enrollee', 'Type': 'foo'},
532                {'Role': 'enrollee', 'Type': 'pbc',
533                 'Bssid': '02:33:44:55:66:77'},
534                {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123},
535                {'Role': 'enrollee', 'Type': 'pbc',
536                 'Bssid': dbus.ByteArray(b'12345')},
537                {'Role': 'enrollee', 'Type': 'pbc',
538                 'P2PDeviceAddress': 12345},
539                {'Role': 'enrollee', 'Type': 'pbc',
540                 'P2PDeviceAddress': dbus.ByteArray(b'12345')},
541                {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}]
542    for args in failures:
543        try:
544            wps.Start(args)
545            raise Exception("Invalid WPS.Start() arguments accepted: " + str(args))
546        except dbus.exceptions.DBusException as e:
547            if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"):
548                raise Exception("Unexpected error message: " + str(e))
549
550def test_dbus_wps_oom(dev, apdev):
551    """D-Bus WPS operation (OOM)"""
552    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
553    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
554
555    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"):
556        if_obj.Get(WPAS_DBUS_IFACE, "State",
557                   dbus_interface=dbus.PROPERTIES_IFACE)
558
559    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
560    bssid = apdev[0]['bssid']
561    dev[0].scan_for_bss(bssid, freq=2412)
562
563    time.sleep(0.05)
564    for i in range(1, 3):
565        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"):
566            if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
567                       dbus_interface=dbus.PROPERTIES_IFACE)
568
569    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
570                     dbus_interface=dbus.PROPERTIES_IFACE)
571    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
572    with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"):
573        bss_obj.Get(WPAS_DBUS_BSS, "Rates",
574                    dbus_interface=dbus.PROPERTIES_IFACE)
575    with alloc_fail(dev[0], 1,
576                    "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"):
577        try:
578            bss_obj.Get(WPAS_DBUS_BSS, "Rates",
579                        dbus_interface=dbus.PROPERTIES_IFACE)
580        except dbus.exceptions.DBusException as e:
581            pass
582
583    id = dev[0].add_network()
584    dev[0].set_network(id, "disabled", "0")
585    dev[0].set_network_quoted(id, "ssid", "test")
586
587    for i in range(1, 3):
588        with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"):
589            if_obj.Get(WPAS_DBUS_IFACE, "Networks",
590                       dbus_interface=dbus.PROPERTIES_IFACE)
591
592    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"):
593        dbus_get(dbus, wpas_obj, "Interfaces")
594
595    for i in range(1, 6):
596        with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"):
597            dbus_get(dbus, wpas_obj, "EapMethods")
598
599    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set",
600                         expected="Error.Failed: Failed to set property"):
601        val2 = "push_button display"
602        if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2,
603                   dbus_interface=dbus.PROPERTIES_IFACE)
604
605    with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start",
606                         "WPS.Start",
607                         expected="UnknownError: WPS start failed"):
608        wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'})
609
610def test_dbus_wps_pbc(dev, apdev):
611    """D-Bus WPS/PBC operation and signals"""
612    try:
613        _test_dbus_wps_pbc(dev, apdev)
614    finally:
615        dev[0].request("SET wps_cred_processing 0")
616
617def _test_dbus_wps_pbc(dev, apdev):
618    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
619    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
620
621    hapd = start_ap(apdev[0])
622    hapd.request("WPS_PBC")
623    bssid = apdev[0]['bssid']
624    dev[0].scan_for_bss(bssid, freq="2412")
625    dev[0].request("SET wps_cred_processing 2")
626
627    res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs',
628                     dbus_interface=dbus.PROPERTIES_IFACE)
629    if len(res) != 1:
630        raise Exception("Missing BSSs entry: " + str(res))
631    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0])
632    props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE)
633    logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props)))
634    if 'WPS' not in props:
635        raise Exception("No WPS information in the BSS entry")
636    if 'Type' not in props['WPS']:
637        raise Exception("No Type field in the WPS dictionary")
638    if props['WPS']['Type'] != 'pbc':
639        raise Exception("Unexpected WPS Type: " + props['WPS']['Type'])
640
641    class TestDbusWps(TestDbus):
642        def __init__(self, bus, wps):
643            TestDbus.__init__(self, bus)
644            self.success_seen = False
645            self.credentials_received = False
646            self.wps = wps
647
648        def __enter__(self):
649            gobject.timeout_add(1, self.start_pbc)
650            gobject.timeout_add(15000, self.timeout)
651            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
652            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
653                            "Credentials")
654            self.loop.run()
655            return self
656
657        def wpsEvent(self, name, args):
658            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
659            if name == "success":
660                self.success_seen = True
661                if self.credentials_received:
662                    self.loop.quit()
663
664        def credentials(self, args):
665            logger.debug("credentials: " + str(args))
666            self.credentials_received = True
667            if self.success_seen:
668                self.loop.quit()
669
670        def start_pbc(self, *args):
671            logger.debug("start_pbc")
672            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
673            return False
674
675        def success(self):
676            return self.success_seen and self.credentials_received
677
678    with TestDbusWps(bus, wps) as t:
679        if not t.success():
680            raise Exception("Failure in D-Bus operations")
681
682    dev[0].wait_connected(timeout=10)
683    dev[0].request("DISCONNECT")
684    hapd.disable()
685    dev[0].flush_scan_cache()
686
687def test_dbus_wps_pbc_overlap(dev, apdev):
688    """D-Bus WPS/PBC operation and signal for PBC overlap"""
689    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
690    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
691
692    hapd = start_ap(apdev[0])
693    hapd2 = start_ap(apdev[1], ssid="test-wps2",
694                     ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f")
695    hapd.request("WPS_PBC")
696    hapd2.request("WPS_PBC")
697    bssid = apdev[0]['bssid']
698    dev[0].scan_for_bss(bssid, freq="2412")
699    bssid2 = apdev[1]['bssid']
700    dev[0].scan_for_bss(bssid2, freq="2412")
701
702    class TestDbusWps(TestDbus):
703        def __init__(self, bus, wps):
704            TestDbus.__init__(self, bus)
705            self.overlap_seen = False
706            self.wps = wps
707
708        def __enter__(self):
709            gobject.timeout_add(1, self.start_pbc)
710            gobject.timeout_add(15000, self.timeout)
711            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
712            self.loop.run()
713            return self
714
715        def wpsEvent(self, name, args):
716            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
717            if name == "pbc-overlap":
718                self.overlap_seen = True
719                self.loop.quit()
720
721        def start_pbc(self, *args):
722            logger.debug("start_pbc")
723            self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'})
724            return False
725
726        def success(self):
727            return self.overlap_seen
728
729    with TestDbusWps(bus, wps) as t:
730        if not t.success():
731            raise Exception("Failure in D-Bus operations")
732
733    dev[0].request("WPS_CANCEL")
734    dev[0].request("DISCONNECT")
735    hapd.disable()
736    dev[0].flush_scan_cache()
737
738def test_dbus_wps_pin(dev, apdev):
739    """D-Bus WPS/PIN operation and signals"""
740    try:
741        _test_dbus_wps_pin(dev, apdev)
742    finally:
743        dev[0].request("SET wps_cred_processing 0")
744
745def _test_dbus_wps_pin(dev, apdev):
746    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
747    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
748
749    hapd = start_ap(apdev[0])
750    hapd.request("WPS_PIN any 12345670")
751    bssid = apdev[0]['bssid']
752    dev[0].scan_for_bss(bssid, freq="2412")
753    dev[0].request("SET wps_cred_processing 2")
754
755    class TestDbusWps(TestDbus):
756        def __init__(self, bus):
757            TestDbus.__init__(self, bus)
758            self.success_seen = False
759            self.credentials_received = False
760
761        def __enter__(self):
762            gobject.timeout_add(1, self.start_pin)
763            gobject.timeout_add(15000, self.timeout)
764            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
765            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
766                            "Credentials")
767            self.loop.run()
768            return self
769
770        def wpsEvent(self, name, args):
771            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
772            if name == "success":
773                self.success_seen = True
774                if self.credentials_received:
775                    self.loop.quit()
776
777        def credentials(self, args):
778            logger.debug("credentials: " + str(args))
779            self.credentials_received = True
780            if self.success_seen:
781                self.loop.quit()
782
783        def start_pin(self, *args):
784            logger.debug("start_pin")
785            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
786            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
787                       'Bssid': bssid_ay})
788            return False
789
790        def success(self):
791            return self.success_seen and self.credentials_received
792
793    with TestDbusWps(bus) as t:
794        if not t.success():
795            raise Exception("Failure in D-Bus operations")
796
797    dev[0].wait_connected(timeout=10)
798
799def test_dbus_wps_pin2(dev, apdev):
800    """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)"""
801    try:
802        _test_dbus_wps_pin2(dev, apdev)
803    finally:
804        dev[0].request("SET wps_cred_processing 0")
805
806def _test_dbus_wps_pin2(dev, apdev):
807    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
808    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
809
810    hapd = start_ap(apdev[0])
811    bssid = apdev[0]['bssid']
812    dev[0].scan_for_bss(bssid, freq="2412")
813    dev[0].request("SET wps_cred_processing 2")
814
815    class TestDbusWps(TestDbus):
816        def __init__(self, bus):
817            TestDbus.__init__(self, bus)
818            self.success_seen = False
819            self.failed = False
820
821        def __enter__(self):
822            gobject.timeout_add(1, self.start_pin)
823            gobject.timeout_add(15000, self.timeout)
824            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
825            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
826                            "Credentials")
827            self.loop.run()
828            return self
829
830        def wpsEvent(self, name, args):
831            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
832            if name == "success":
833                self.success_seen = True
834                if self.credentials_received:
835                    self.loop.quit()
836
837        def credentials(self, args):
838            logger.debug("credentials: " + str(args))
839            self.credentials_received = True
840            if self.success_seen:
841                self.loop.quit()
842
843        def start_pin(self, *args):
844            logger.debug("start_pin")
845            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
846            res = wps.Start({'Role': 'enrollee', 'Type': 'pin',
847                             'Bssid': bssid_ay})
848            pin = res['Pin']
849            h = hostapd.Hostapd(apdev[0]['ifname'])
850            h.request("WPS_PIN any " + pin)
851            return False
852
853        def success(self):
854            return self.success_seen and self.credentials_received
855
856    with TestDbusWps(bus) as t:
857        if not t.success():
858            raise Exception("Failure in D-Bus operations")
859
860    dev[0].wait_connected(timeout=10)
861
862def test_dbus_wps_pin_m2d(dev, apdev):
863    """D-Bus WPS/PIN operation and signals with M2D"""
864    try:
865        _test_dbus_wps_pin_m2d(dev, apdev)
866    finally:
867        dev[0].request("SET wps_cred_processing 0")
868
869def _test_dbus_wps_pin_m2d(dev, apdev):
870    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
871    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
872
873    hapd = start_ap(apdev[0])
874    bssid = apdev[0]['bssid']
875    dev[0].scan_for_bss(bssid, freq="2412")
876    dev[0].request("SET wps_cred_processing 2")
877
878    class TestDbusWps(TestDbus):
879        def __init__(self, bus):
880            TestDbus.__init__(self, bus)
881            self.success_seen = False
882            self.credentials_received = False
883
884        def __enter__(self):
885            gobject.timeout_add(1, self.start_pin)
886            gobject.timeout_add(15000, self.timeout)
887            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
888            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
889                            "Credentials")
890            self.loop.run()
891            return self
892
893        def wpsEvent(self, name, args):
894            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
895            if name == "success":
896                self.success_seen = True
897                if self.credentials_received:
898                    self.loop.quit()
899            elif name == "m2d":
900                h = hostapd.Hostapd(apdev[0]['ifname'])
901                h.request("WPS_PIN any 12345670")
902
903        def credentials(self, args):
904            logger.debug("credentials: " + str(args))
905            self.credentials_received = True
906            if self.success_seen:
907                self.loop.quit()
908
909        def start_pin(self, *args):
910            logger.debug("start_pin")
911            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
912            wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
913                       'Bssid': bssid_ay})
914            return False
915
916        def success(self):
917            return self.success_seen and self.credentials_received
918
919    with TestDbusWps(bus) as t:
920        if not t.success():
921            raise Exception("Failure in D-Bus operations")
922
923    dev[0].wait_connected(timeout=10)
924
925def test_dbus_wps_reg(dev, apdev):
926    """D-Bus WPS/Registrar operation and signals"""
927    try:
928        _test_dbus_wps_reg(dev, apdev)
929    finally:
930        dev[0].request("SET wps_cred_processing 0")
931
932def _test_dbus_wps_reg(dev, apdev):
933    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
934    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
935
936    hapd = start_ap(apdev[0])
937    hapd.request("WPS_PIN any 12345670")
938    bssid = apdev[0]['bssid']
939    dev[0].scan_for_bss(bssid, freq="2412")
940    dev[0].request("SET wps_cred_processing 2")
941
942    class TestDbusWps(TestDbus):
943        def __init__(self, bus):
944            TestDbus.__init__(self, bus)
945            self.credentials_received = False
946
947        def __enter__(self):
948            gobject.timeout_add(100, self.start_reg)
949            gobject.timeout_add(15000, self.timeout)
950            self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event")
951            self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS,
952                            "Credentials")
953            self.loop.run()
954            return self
955
956        def wpsEvent(self, name, args):
957            logger.debug("wpsEvent: %s args='%s'" % (name, str(args)))
958
959        def credentials(self, args):
960            logger.debug("credentials: " + str(args))
961            self.credentials_received = True
962            self.loop.quit()
963
964        def start_reg(self, *args):
965            logger.debug("start_reg")
966            bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
967            wps.Start({'Role': 'registrar', 'Type': 'pin',
968                       'Pin': '12345670', 'Bssid': bssid_ay})
969            return False
970
971        def success(self):
972            return self.credentials_received
973
974    with TestDbusWps(bus) as t:
975        if not t.success():
976            raise Exception("Failure in D-Bus operations")
977
978    dev[0].wait_connected(timeout=10)
979
980def test_dbus_wps_cancel(dev, apdev):
981    """D-Bus WPS Cancel operation"""
982    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
983    wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS)
984
985    hapd = start_ap(apdev[0])
986    bssid = apdev[0]['bssid']
987
988    wps.Cancel()
989    dev[0].scan_for_bss(bssid, freq="2412")
990    bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode()))
991    wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670',
992               'Bssid': bssid_ay})
993    wps.Cancel()
994    dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1)
995
996def test_dbus_scan_invalid(dev, apdev):
997    """D-Bus invalid scan method"""
998    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
999    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1000
1001    tests = [({}, "InvalidArgs"),
1002             ({'Type': 123}, "InvalidArgs"),
1003             ({'Type': 'foo'}, "InvalidArgs"),
1004             ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"),
1005             ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"),
1006             ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"),
1007             ({'Type': 'active',
1008               'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"),
1009                         dbus.ByteArray(b"3"), dbus.ByteArray(b"4"),
1010                         dbus.ByteArray(b"5"), dbus.ByteArray(b"6"),
1011                         dbus.ByteArray(b"7"), dbus.ByteArray(b"8"),
1012                         dbus.ByteArray(b"9"), dbus.ByteArray(b"10"),
1013                         dbus.ByteArray(b"11"), dbus.ByteArray(b"12"),
1014                         dbus.ByteArray(b"13"), dbus.ByteArray(b"14"),
1015                         dbus.ByteArray(b"15"), dbus.ByteArray(b"16"),
1016                         dbus.ByteArray(b"17")]},
1017              "InvalidArgs"),
1018             ({'Type': 'active',
1019               'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]},
1020              "InvalidArgs"),
1021             ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"),
1022             ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"),
1023             ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"),
1024             ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"),
1025             ({'Type': 'active',
1026               'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]},
1027              "InvalidArgs"),
1028             ({'Type': 'active',
1029               'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]},
1030              "InvalidArgs"),
1031             ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"),
1032             ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]},
1033              "InvalidArgs"),
1034             ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]},
1035              "InvalidArgs")]
1036    for (t, err) in tests:
1037        try:
1038            iface.Scan(t)
1039            raise Exception("Invalid Scan() arguments accepted: " + str(t))
1040        except dbus.exceptions.DBusException as e:
1041            if err not in str(e):
1042                raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e)))
1043
1044def test_dbus_scan_oom(dev, apdev):
1045    """D-Bus scan method and OOM"""
1046    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1047    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1048
1049    with alloc_fail_dbus(dev[0], 1,
1050                         "wpa_scan_clone_params;wpas_dbus_handler_scan",
1051                         "Scan", expected="ScanError: Scan request rejected"):
1052        iface.Scan({'Type': 'passive',
1053                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1054
1055    with alloc_fail_dbus(dev[0], 1,
1056                         "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan",
1057                         "Scan"):
1058        iface.Scan({'Type': 'passive',
1059                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1060
1061    with alloc_fail_dbus(dev[0], 1,
1062                         "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan",
1063                         "Scan"):
1064        iface.Scan({'Type': 'active',
1065                    'IEs': [dbus.ByteArray(b"\xdd\x00")],
1066                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1067
1068    with alloc_fail_dbus(dev[0], 1,
1069                         "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan",
1070                         "Scan"):
1071        iface.Scan({'Type': 'active',
1072                    'SSIDs': [dbus.ByteArray(b"open"),
1073                              dbus.ByteArray()],
1074                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1075
1076def test_dbus_scan(dev, apdev):
1077    """D-Bus scan and related signals"""
1078    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1079    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1080
1081    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
1082
1083    class TestDbusScan(TestDbus):
1084        def __init__(self, bus):
1085            TestDbus.__init__(self, bus)
1086            self.scan_completed = 0
1087            self.bss_added = False
1088            self.fail_reason = None
1089
1090        def __enter__(self):
1091            gobject.timeout_add(1, self.run_scan)
1092            gobject.timeout_add(15000, self.timeout)
1093            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
1094            self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded")
1095            self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved")
1096            self.loop.run()
1097            return self
1098
1099        def scanDone(self, success):
1100            logger.debug("scanDone: success=%s" % success)
1101            self.scan_completed += 1
1102            if self.scan_completed == 1:
1103                iface.Scan({'Type': 'passive',
1104                            'AllowRoam': True,
1105                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1106            elif self.scan_completed == 2:
1107                iface.Scan({'Type': 'passive',
1108                            'AllowRoam': False})
1109            elif self.bss_added and self.scan_completed == 3:
1110                self.loop.quit()
1111
1112        def bssAdded(self, bss, properties):
1113            logger.debug("bssAdded: %s" % bss)
1114            logger.debug(str(properties))
1115            if 'WPS' in properties:
1116                if 'Type' in properties['WPS']:
1117                    self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS"
1118                    self.loop.quit()
1119            self.bss_added = True
1120            if self.scan_completed == 3:
1121                self.loop.quit()
1122
1123        def bssRemoved(self, bss):
1124            logger.debug("bssRemoved: %s" % bss)
1125
1126        def run_scan(self, *args):
1127            logger.debug("run_scan")
1128            iface.Scan({'Type': 'active',
1129                        'SSIDs': [dbus.ByteArray(b"open"),
1130                                  dbus.ByteArray()],
1131                        'IEs': [dbus.ByteArray(b"\xdd\x00"),
1132                                dbus.ByteArray()],
1133                        'AllowRoam': False,
1134                        'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
1135            return False
1136
1137        def success(self):
1138            return self.scan_completed == 3 and self.bss_added
1139
1140    with TestDbusScan(bus) as t:
1141        if t.fail_reason:
1142            raise Exception(t.fail_reason)
1143        if not t.success():
1144            raise Exception("Expected signals not seen")
1145
1146    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1147                     dbus_interface=dbus.PROPERTIES_IFACE)
1148    if len(res) < 1:
1149        raise Exception("Scan result not in BSSs property")
1150    iface.FlushBSS(0)
1151    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs",
1152                     dbus_interface=dbus.PROPERTIES_IFACE)
1153    if len(res) != 0:
1154        raise Exception("FlushBSS() did not remove scan results from BSSs property")
1155    iface.FlushBSS(1)
1156
1157def test_dbus_scan_rand(dev, apdev):
1158    """D-Bus MACAddressRandomizationMask property Get/Set"""
1159    try:
1160        run_dbus_scan_rand(dev, apdev)
1161    finally:
1162        dev[0].request("MAC_RAND_SCAN all enable=0")
1163
1164def run_dbus_scan_rand(dev, apdev):
1165    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1166    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1167
1168    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1169                     dbus_interface=dbus.PROPERTIES_IFACE)
1170    if len(res) != 0:
1171        logger.info(str(res))
1172        raise Exception("Unexpected initial MACAddressRandomizationMask value")
1173
1174    try:
1175        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo",
1176                   dbus_interface=dbus.PROPERTIES_IFACE)
1177        raise Exception("Invalid Set accepted")
1178    except dbus.exceptions.DBusException as e:
1179        if "InvalidArgs: invalid message format" not in str(e):
1180            raise Exception("Unexpected error message: " + str(e))
1181
1182    try:
1183        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1184                   {"foo": "bar"},
1185                   dbus_interface=dbus.PROPERTIES_IFACE)
1186        raise Exception("Invalid Set accepted")
1187    except dbus.exceptions.DBusException as e:
1188        if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e):
1189            raise Exception("Unexpected error message: " + str(e))
1190
1191    try:
1192        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1193                   {"foo": dbus.ByteArray(b'123456')},
1194                   dbus_interface=dbus.PROPERTIES_IFACE)
1195        raise Exception("Invalid Set accepted")
1196    except dbus.exceptions.DBusException as e:
1197        if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e):
1198            raise Exception("Unexpected error message: " + str(e))
1199
1200    try:
1201        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1202                   {"scan": dbus.ByteArray(b'12345')},
1203                   dbus_interface=dbus.PROPERTIES_IFACE)
1204        raise Exception("Invalid Set accepted")
1205    except dbus.exceptions.DBusException as e:
1206        if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e):
1207            raise Exception("Unexpected error message: " + str(e))
1208
1209    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1210               {"scan": dbus.ByteArray(b'123456')},
1211               dbus_interface=dbus.PROPERTIES_IFACE)
1212    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1213                     dbus_interface=dbus.PROPERTIES_IFACE)
1214    if len(res) != 1:
1215        logger.info(str(res))
1216        raise Exception("Unexpected MACAddressRandomizationMask value")
1217
1218    try:
1219        if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1220                   {"scan": dbus.ByteArray(b'123456'),
1221                    "sched_scan": dbus.ByteArray(b'987654')},
1222                   dbus_interface=dbus.PROPERTIES_IFACE)
1223    except dbus.exceptions.DBusException as e:
1224        # sched_scan is unlikely to be supported
1225        pass
1226
1227    if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1228               dbus.Dictionary({}, signature='sv'),
1229               dbus_interface=dbus.PROPERTIES_IFACE)
1230    res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask",
1231                     dbus_interface=dbus.PROPERTIES_IFACE)
1232    if len(res) != 0:
1233        logger.info(str(res))
1234        raise Exception("Unexpected MACAddressRandomizationMask value")
1235
1236def test_dbus_scan_busy(dev, apdev):
1237    """D-Bus scan trigger rejection when busy with previous scan"""
1238    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1239    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1240
1241    if "OK" not in dev[0].request("SCAN freq=2412-2462"):
1242        raise Exception("Failed to start scan")
1243    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1244    if ev is None:
1245        raise Exception("Scan start timed out")
1246
1247    try:
1248        iface.Scan({'Type': 'active', 'AllowRoam': False})
1249        raise Exception("Scan() accepted when busy")
1250    except dbus.exceptions.DBusException as e:
1251        if "ScanError: Scan request reject" not in str(e):
1252            raise Exception("Unexpected error message: " + str(e))
1253
1254    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1255    if ev is None:
1256        raise Exception("Scan timed out")
1257
1258def test_dbus_scan_abort(dev, apdev):
1259    """D-Bus scan trigger and abort"""
1260    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1261    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1262
1263    iface.Scan({'Type': 'active', 'AllowRoam': False})
1264    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15)
1265    if ev is None:
1266        raise Exception("Scan start timed out")
1267
1268    iface.AbortScan()
1269    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1270    if ev is None:
1271        raise Exception("Scan abort result timed out")
1272    dev[0].dump_monitor()
1273    iface.Scan({'Type': 'active', 'AllowRoam': False})
1274    iface.AbortScan()
1275
1276    ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15)
1277    if ev is None:
1278        raise Exception("Scan timed out")
1279
1280def test_dbus_connect(dev, apdev):
1281    """D-Bus AddNetwork and connect"""
1282    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1283    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1284
1285    ssid = "test-wpa2-psk"
1286    passphrase = 'qwertyuiop'
1287    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1288    hapd = hostapd.add_ap(apdev[0], params)
1289
1290    class TestDbusConnect(TestDbus):
1291        def __init__(self, bus):
1292            TestDbus.__init__(self, bus)
1293            self.network_added = False
1294            self.network_selected = False
1295            self.network_removed = False
1296            self.state = 0
1297
1298        def __enter__(self):
1299            gobject.timeout_add(1, self.run_connect)
1300            gobject.timeout_add(15000, self.timeout)
1301            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1302            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1303                            "NetworkRemoved")
1304            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1305                            "NetworkSelected")
1306            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1307                            "PropertiesChanged")
1308            self.loop.run()
1309            return self
1310
1311        def networkAdded(self, network, properties):
1312            logger.debug("networkAdded: %s" % str(network))
1313            logger.debug(str(properties))
1314            self.network_added = True
1315
1316        def networkRemoved(self, network):
1317            logger.debug("networkRemoved: %s" % str(network))
1318            self.network_removed = True
1319
1320        def networkSelected(self, network):
1321            logger.debug("networkSelected: %s" % str(network))
1322            self.network_selected = True
1323
1324        def propertiesChanged(self, properties):
1325            logger.debug("propertiesChanged: %s" % str(properties))
1326            if 'State' in properties and properties['State'] == "completed":
1327                if self.state == 0:
1328                    self.state = 1
1329                    iface.Disconnect()
1330                elif self.state == 2:
1331                    self.state = 3
1332                    iface.Disconnect()
1333                elif self.state == 4:
1334                    self.state = 5
1335                    iface.Reattach()
1336                elif self.state == 5:
1337                    self.state = 6
1338                    iface.Disconnect()
1339                elif self.state == 7:
1340                    self.state = 8
1341                    res = iface.SignalPoll()
1342                    logger.debug("SignalPoll: " + str(res))
1343                    if 'frequency' not in res or res['frequency'] != 2412:
1344                        self.state = -1
1345                        logger.info("Unexpected SignalPoll result")
1346                    iface.RemoveNetwork(self.netw)
1347            if 'State' in properties and properties['State'] == "disconnected":
1348                if self.state == 1:
1349                    self.state = 2
1350                    iface.SelectNetwork(self.netw)
1351                elif self.state == 3:
1352                    self.state = 4
1353                    iface.Reassociate()
1354                elif self.state == 6:
1355                    self.state = 7
1356                    iface.Reconnect()
1357                elif self.state == 8:
1358                    self.state = 9
1359                    self.loop.quit()
1360
1361        def run_connect(self, *args):
1362            logger.debug("run_connect")
1363            args = dbus.Dictionary({'ssid': ssid,
1364                                    'key_mgmt': 'WPA-PSK',
1365                                    'psk': passphrase,
1366                                    'scan_freq': 2412},
1367                                   signature='sv')
1368            self.netw = iface.AddNetwork(args)
1369            iface.SelectNetwork(self.netw)
1370            return False
1371
1372        def success(self):
1373            if not self.network_added or \
1374               not self.network_removed or \
1375               not self.network_selected:
1376                return False
1377            return self.state == 9
1378
1379    with TestDbusConnect(bus) as t:
1380        if not t.success():
1381            raise Exception("Expected signals not seen")
1382
1383def test_dbus_remove_connected(dev, apdev):
1384    """D-Bus RemoveAllNetworks while connected"""
1385    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1386    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1387
1388    ssid = "test-open"
1389    hapd = hostapd.add_ap(apdev[0], {"ssid": ssid})
1390
1391    class TestDbusConnect(TestDbus):
1392        def __init__(self, bus):
1393            TestDbus.__init__(self, bus)
1394            self.network_added = False
1395            self.network_selected = False
1396            self.network_removed = False
1397            self.state = 0
1398
1399        def __enter__(self):
1400            gobject.timeout_add(1, self.run_connect)
1401            gobject.timeout_add(15000, self.timeout)
1402            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1403            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1404                            "NetworkRemoved")
1405            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1406                            "NetworkSelected")
1407            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1408                            "PropertiesChanged")
1409            self.loop.run()
1410            return self
1411
1412        def networkAdded(self, network, properties):
1413            logger.debug("networkAdded: %s" % str(network))
1414            logger.debug(str(properties))
1415            self.network_added = True
1416
1417        def networkRemoved(self, network):
1418            logger.debug("networkRemoved: %s" % str(network))
1419            self.network_removed = True
1420
1421        def networkSelected(self, network):
1422            logger.debug("networkSelected: %s" % str(network))
1423            self.network_selected = True
1424
1425        def propertiesChanged(self, properties):
1426            logger.debug("propertiesChanged: %s" % str(properties))
1427            if 'State' in properties and properties['State'] == "completed":
1428                if self.state == 0:
1429                    self.state = 1
1430                    iface.Disconnect()
1431                elif self.state == 2:
1432                    self.state = 3
1433                    iface.Disconnect()
1434                elif self.state == 4:
1435                    self.state = 5
1436                    iface.Reattach()
1437                elif self.state == 5:
1438                    self.state = 6
1439                    iface.Disconnect()
1440                elif self.state == 7:
1441                    self.state = 8
1442                    res = iface.SignalPoll()
1443                    logger.debug("SignalPoll: " + str(res))
1444                    if 'frequency' not in res or res['frequency'] != 2412:
1445                        self.state = -1
1446                        logger.info("Unexpected SignalPoll result")
1447                    iface.RemoveAllNetworks()
1448            if 'State' in properties and properties['State'] == "disconnected":
1449                if self.state == 1:
1450                    self.state = 2
1451                    iface.SelectNetwork(self.netw)
1452                elif self.state == 3:
1453                    self.state = 4
1454                    iface.Reassociate()
1455                elif self.state == 6:
1456                    self.state = 7
1457                    iface.Reconnect()
1458                elif self.state == 8:
1459                    self.state = 9
1460                    self.loop.quit()
1461
1462        def run_connect(self, *args):
1463            logger.debug("run_connect")
1464            args = dbus.Dictionary({'ssid': ssid,
1465                                    'key_mgmt': 'NONE',
1466                                    'scan_freq': 2412},
1467                                   signature='sv')
1468            self.netw = iface.AddNetwork(args)
1469            iface.SelectNetwork(self.netw)
1470            return False
1471
1472        def success(self):
1473            if not self.network_added or \
1474               not self.network_removed or \
1475               not self.network_selected:
1476                return False
1477            return self.state == 9
1478
1479    with TestDbusConnect(bus) as t:
1480        if not t.success():
1481            raise Exception("Expected signals not seen")
1482
1483def test_dbus_connect_psk_mem(dev, apdev):
1484    """D-Bus AddNetwork and connect with memory-only PSK"""
1485    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1486    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1487
1488    ssid = "test-wpa2-psk"
1489    passphrase = 'qwertyuiop'
1490    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1491    hapd = hostapd.add_ap(apdev[0], params)
1492
1493    class TestDbusConnect(TestDbus):
1494        def __init__(self, bus):
1495            TestDbus.__init__(self, bus)
1496            self.connected = False
1497
1498        def __enter__(self):
1499            gobject.timeout_add(1, self.run_connect)
1500            gobject.timeout_add(15000, self.timeout)
1501            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1502                            "PropertiesChanged")
1503            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1504                            "NetworkRequest")
1505            self.loop.run()
1506            return self
1507
1508        def propertiesChanged(self, properties):
1509            logger.debug("propertiesChanged: %s" % str(properties))
1510            if 'State' in properties and properties['State'] == "completed":
1511                self.connected = True
1512                self.loop.quit()
1513
1514        def networkRequest(self, path, field, txt):
1515            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1516            if field == "PSK_PASSPHRASE":
1517                iface.NetworkReply(path, field, '"' + passphrase + '"')
1518
1519        def run_connect(self, *args):
1520            logger.debug("run_connect")
1521            args = dbus.Dictionary({'ssid': ssid,
1522                                    'key_mgmt': 'WPA-PSK',
1523                                    'mem_only_psk': 1,
1524                                    'scan_freq': 2412},
1525                                   signature='sv')
1526            self.netw = iface.AddNetwork(args)
1527            iface.SelectNetwork(self.netw)
1528            return False
1529
1530        def success(self):
1531            return self.connected
1532
1533    with TestDbusConnect(bus) as t:
1534        if not t.success():
1535            raise Exception("Expected signals not seen")
1536
1537def test_dbus_connect_oom(dev, apdev):
1538    """D-Bus AddNetwork and connect when out-of-memory"""
1539    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1540    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1541
1542    if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"):
1543        raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build")
1544
1545    ssid = "test-wpa2-psk"
1546    passphrase = 'qwertyuiop'
1547    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
1548    hapd = hostapd.add_ap(apdev[0], params)
1549
1550    class TestDbusConnect(TestDbus):
1551        def __init__(self, bus):
1552            TestDbus.__init__(self, bus)
1553            self.network_added = False
1554            self.network_selected = False
1555            self.network_removed = False
1556            self.state = 0
1557
1558        def __enter__(self):
1559            gobject.timeout_add(1, self.run_connect)
1560            gobject.timeout_add(1500, self.timeout)
1561            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
1562            self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE,
1563                            "NetworkRemoved")
1564            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
1565                            "NetworkSelected")
1566            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1567                            "PropertiesChanged")
1568            self.loop.run()
1569            return self
1570
1571        def networkAdded(self, network, properties):
1572            logger.debug("networkAdded: %s" % str(network))
1573            logger.debug(str(properties))
1574            self.network_added = True
1575
1576        def networkRemoved(self, network):
1577            logger.debug("networkRemoved: %s" % str(network))
1578            self.network_removed = True
1579
1580        def networkSelected(self, network):
1581            logger.debug("networkSelected: %s" % str(network))
1582            self.network_selected = True
1583
1584        def propertiesChanged(self, properties):
1585            logger.debug("propertiesChanged: %s" % str(properties))
1586            if 'State' in properties and properties['State'] == "completed":
1587                if self.state == 0:
1588                    self.state = 1
1589                    iface.Disconnect()
1590                elif self.state == 2:
1591                    self.state = 3
1592                    iface.Disconnect()
1593                elif self.state == 4:
1594                    self.state = 5
1595                    iface.Reattach()
1596                elif self.state == 5:
1597                    self.state = 6
1598                    res = iface.SignalPoll()
1599                    logger.debug("SignalPoll: " + str(res))
1600                    if 'frequency' not in res or res['frequency'] != 2412:
1601                        self.state = -1
1602                        logger.info("Unexpected SignalPoll result")
1603                    iface.RemoveNetwork(self.netw)
1604            if 'State' in properties and properties['State'] == "disconnected":
1605                if self.state == 1:
1606                    self.state = 2
1607                    iface.SelectNetwork(self.netw)
1608                elif self.state == 3:
1609                    self.state = 4
1610                    iface.Reassociate()
1611                elif self.state == 6:
1612                    self.state = 7
1613                    self.loop.quit()
1614
1615        def run_connect(self, *args):
1616            logger.debug("run_connect")
1617            args = dbus.Dictionary({'ssid': ssid,
1618                                    'key_mgmt': 'WPA-PSK',
1619                                    'psk': passphrase,
1620                                    'scan_freq': 2412},
1621                                   signature='sv')
1622            try:
1623                self.netw = iface.AddNetwork(args)
1624            except Exception as e:
1625                logger.info("Exception on AddNetwork: " + str(e))
1626                self.loop.quit()
1627                return False
1628            try:
1629                iface.SelectNetwork(self.netw)
1630            except Exception as e:
1631                logger.info("Exception on SelectNetwork: " + str(e))
1632                self.loop.quit()
1633
1634            return False
1635
1636        def success(self):
1637            if not self.network_added or \
1638               not self.network_removed or \
1639               not self.network_selected:
1640                return False
1641            return self.state == 7
1642
1643    count = 0
1644    for i in range(1, 1000):
1645        for j in range(3):
1646            dev[j].dump_monitor()
1647        dev[0].request("TEST_ALLOC_FAIL %d:main" % i)
1648        try:
1649            with TestDbusConnect(bus) as t:
1650                if not t.success():
1651                    logger.info("Iteration %d - Expected signals not seen" % i)
1652                else:
1653                    logger.info("Iteration %d - success" % i)
1654
1655            state = dev[0].request('GET_ALLOC_FAIL')
1656            logger.info("GET_ALLOC_FAIL: " + state)
1657            dev[0].dump_monitor()
1658            dev[0].request("TEST_ALLOC_FAIL 0:")
1659            if i < 3:
1660                raise Exception("Connection succeeded during out-of-memory")
1661            if not state.startswith('0:'):
1662                count += 1
1663                if count == 5:
1664                    break
1665        except:
1666            pass
1667
1668    # Force regulatory update to re-fetch hw capabilities for the following
1669    # test cases.
1670    try:
1671        dev[0].dump_monitor()
1672        subprocess.call(['iw', 'reg', 'set', 'US'])
1673        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1674    finally:
1675        dev[0].dump_monitor()
1676        subprocess.call(['iw', 'reg', 'set', '00'])
1677        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
1678
1679def test_dbus_while_not_connected(dev, apdev):
1680    """D-Bus invalid operations while not connected"""
1681    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1682    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1683
1684    try:
1685        iface.Disconnect()
1686        raise Exception("Disconnect() accepted when not connected")
1687    except dbus.exceptions.DBusException as e:
1688        if "NotConnected" not in str(e):
1689            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
1690
1691    try:
1692        iface.Reattach()
1693        raise Exception("Reattach() accepted when not connected")
1694    except dbus.exceptions.DBusException as e:
1695        if "NotConnected" not in str(e):
1696            raise Exception("Unexpected error message for invalid Reattach: " + str(e))
1697
1698def test_dbus_connect_eap(dev, apdev):
1699    """D-Bus AddNetwork and connect to EAP network"""
1700    check_altsubject_match_support(dev[0])
1701    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1702    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1703
1704    ssid = "ieee8021x-open"
1705    params = hostapd.radius_params()
1706    params["ssid"] = ssid
1707    params["ieee8021x"] = "1"
1708    hapd = hostapd.add_ap(apdev[0], params)
1709
1710    class TestDbusConnect(TestDbus):
1711        def __init__(self, bus):
1712            TestDbus.__init__(self, bus)
1713            self.certification_received = False
1714            self.eap_status = False
1715            self.state = 0
1716
1717        def __enter__(self):
1718            gobject.timeout_add(1, self.run_connect)
1719            gobject.timeout_add(15000, self.timeout)
1720            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
1721                            "PropertiesChanged")
1722            self.add_signal(self.certification, WPAS_DBUS_IFACE,
1723                            "Certification", byte_arrays=True)
1724            self.add_signal(self.networkRequest, WPAS_DBUS_IFACE,
1725                            "NetworkRequest")
1726            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
1727            self.loop.run()
1728            return self
1729
1730        def propertiesChanged(self, properties):
1731            logger.debug("propertiesChanged: %s" % str(properties))
1732            if 'State' in properties and properties['State'] == "completed":
1733                if self.state == 0:
1734                    self.state = 1
1735                    iface.EAPLogoff()
1736                    logger.info("Set dNSName constraint")
1737                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1738                    args = dbus.Dictionary({'altsubject_match':
1739                                            self.server_dnsname},
1740                                           signature='sv')
1741                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1742                                dbus_interface=dbus.PROPERTIES_IFACE)
1743                elif self.state == 2:
1744                    self.state = 3
1745                    iface.Disconnect()
1746                    logger.info("Set non-matching dNSName constraint")
1747                    net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw)
1748                    args = dbus.Dictionary({'altsubject_match':
1749                                            self.server_dnsname + "FOO"},
1750                                           signature='sv')
1751                    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1752                                dbus_interface=dbus.PROPERTIES_IFACE)
1753            if 'State' in properties and properties['State'] == "disconnected":
1754                if self.state == 1:
1755                    self.state = 2
1756                    iface.EAPLogon()
1757                    iface.SelectNetwork(self.netw)
1758                if self.state == 3:
1759                    self.state = 4
1760                    iface.SelectNetwork(self.netw)
1761
1762        def certification(self, args):
1763            logger.debug("certification: %s" % str(args))
1764            self.certification_received = True
1765            if args['depth'] == 0:
1766                # The test server certificate is supposed to have dNSName
1767                if len(args['altsubject']) < 1:
1768                    raise Exception("Missing dNSName")
1769                dnsname = args['altsubject'][0]
1770                if not dnsname.startswith("DNS:"):
1771                    raise Exception("Expected dNSName not found: " + dnsname)
1772                logger.info("altsubject: " + dnsname)
1773                self.server_dnsname = dnsname
1774
1775        def eap(self, status, parameter):
1776            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
1777            if status == 'completion' and parameter == 'success':
1778                self.eap_status = True
1779            if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch':
1780                self.state = 5
1781                self.loop.quit()
1782
1783        def networkRequest(self, path, field, txt):
1784            logger.debug("networkRequest: %s %s %s" % (path, field, txt))
1785            if field == "PASSWORD":
1786                iface.NetworkReply(path, field, "password")
1787
1788        def run_connect(self, *args):
1789            logger.debug("run_connect")
1790            args = dbus.Dictionary({'ssid': ssid,
1791                                    'key_mgmt': 'IEEE8021X',
1792                                    'eapol_flags': 0,
1793                                    'eap': 'TTLS',
1794                                    'anonymous_identity': 'ttls',
1795                                    'identity': 'pap user',
1796                                    'ca_cert': 'auth_serv/ca.pem',
1797                                    'phase2': 'auth=PAP',
1798                                    'scan_freq': 2412},
1799                                   signature='sv')
1800            self.netw = iface.AddNetwork(args)
1801            iface.SelectNetwork(self.netw)
1802            return False
1803
1804        def success(self):
1805            if not self.eap_status or not self.certification_received:
1806                return False
1807            return self.state == 5
1808
1809    with TestDbusConnect(bus) as t:
1810        if not t.success():
1811            raise Exception("Expected signals not seen")
1812
1813def test_dbus_network(dev, apdev):
1814    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
1815    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1816    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1817
1818    args = dbus.Dictionary({'ssid': "foo",
1819                            'key_mgmt': 'WPA-PSK',
1820                            'psk': "12345678",
1821                            'identity': dbus.ByteArray([1, 2]),
1822                            'priority': dbus.Int32(0),
1823                            'scan_freq': dbus.UInt32(2412)},
1824                           signature='sv')
1825    netw = iface.AddNetwork(args)
1826    id = int(dev[0].list_networks()[0]['id'])
1827    val = dev[0].get_network(id, "scan_freq")
1828    if val != "2412":
1829        raise Exception("Invalid scan_freq value: " + str(val))
1830    iface.RemoveNetwork(netw)
1831
1832    args = dbus.Dictionary({'ssid': "foo",
1833                            'key_mgmt': 'NONE',
1834                            'scan_freq': "2412 2432",
1835                            'freq_list': "2412 2417 2432"},
1836                           signature='sv')
1837    netw = iface.AddNetwork(args)
1838    id = int(dev[0].list_networks()[0]['id'])
1839    val = dev[0].get_network(id, "scan_freq")
1840    if val != "2412 2432":
1841        raise Exception("Invalid scan_freq value (2): " + str(val))
1842    val = dev[0].get_network(id, "freq_list")
1843    if val != "2412 2417 2432":
1844        raise Exception("Invalid freq_list value: " + str(val))
1845    iface.RemoveNetwork(netw)
1846    try:
1847        iface.RemoveNetwork(netw)
1848        raise Exception("Invalid RemoveNetwork() accepted")
1849    except dbus.exceptions.DBusException as e:
1850        if "NetworkUnknown" not in str(e):
1851            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1852    try:
1853        iface.SelectNetwork(netw)
1854        raise Exception("Invalid SelectNetwork() accepted")
1855    except dbus.exceptions.DBusException as e:
1856        if "NetworkUnknown" not in str(e):
1857            raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e))
1858
1859    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1860                            'identity': "testuser", 'scan_freq': '2412'},
1861                           signature='sv')
1862    netw1 = iface.AddNetwork(args)
1863    args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1864                           signature='sv')
1865    netw2 = iface.AddNetwork(args)
1866    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1867                     dbus_interface=dbus.PROPERTIES_IFACE)
1868    if len(res) != 2:
1869        raise Exception("Unexpected number of networks")
1870
1871    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1872    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1873                      dbus_interface=dbus.PROPERTIES_IFACE)
1874    if res != False:
1875        raise Exception("Added network was unexpectedly enabled by default")
1876    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True),
1877                dbus_interface=dbus.PROPERTIES_IFACE)
1878    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1879                      dbus_interface=dbus.PROPERTIES_IFACE)
1880    if res != True:
1881        raise Exception("Set(Enabled,True) did not seem to change property value")
1882    net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False),
1883                dbus_interface=dbus.PROPERTIES_IFACE)
1884    res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled",
1885                      dbus_interface=dbus.PROPERTIES_IFACE)
1886    if res != False:
1887        raise Exception("Set(Enabled,False) did not seem to change property value")
1888    try:
1889        net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1),
1890                    dbus_interface=dbus.PROPERTIES_IFACE)
1891        raise Exception("Invalid Set(Enabled,1) accepted")
1892    except dbus.exceptions.DBusException as e:
1893        if "Error.Failed: wrong property type" not in str(e):
1894            raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e))
1895
1896    args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv')
1897    net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args,
1898                dbus_interface=dbus.PROPERTIES_IFACE)
1899    res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1900                      dbus_interface=dbus.PROPERTIES_IFACE)
1901    if res['ssid'] != '"foo1new"':
1902        raise Exception("Set(Properties) failed to update ssid")
1903    if res['identity'] != '"testuser"':
1904        raise Exception("Set(Properties) unexpectedly changed unrelated parameter")
1905
1906    iface.RemoveAllNetworks()
1907    res = if_obj.Get(WPAS_DBUS_IFACE, "Networks",
1908                     dbus_interface=dbus.PROPERTIES_IFACE)
1909    if len(res) != 0:
1910        raise Exception("Unexpected number of networks")
1911    iface.RemoveAllNetworks()
1912
1913    tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'),
1914             dbus.Dictionary({'identity': dbus.ByteArray()},
1915                             signature='sv'),
1916             dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')]
1917    for args in tests:
1918        try:
1919            iface.AddNetwork(args)
1920            raise Exception("Invalid AddNetwork args accepted: " + str(args))
1921        except dbus.exceptions.DBusException as e:
1922            if "InvalidArgs" not in str(e):
1923                raise Exception("Unexpected error message for invalid AddNetwork: " + str(e))
1924
1925def test_dbus_network_oom(dev, apdev):
1926    """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases"""
1927    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
1928    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
1929
1930    args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE',
1931                            'identity': "testuser", 'scan_freq': '2412'},
1932                           signature='sv')
1933    netw1 = iface.AddNetwork(args)
1934    net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1)
1935
1936    with alloc_fail_dbus(dev[0], 1,
1937                         "wpa_config_get_all;wpas_dbus_getter_network_properties",
1938                         "Get"):
1939        net_obj.Get(WPAS_DBUS_NETWORK, "Properties",
1940                    dbus_interface=dbus.PROPERTIES_IFACE)
1941
1942    iface.RemoveAllNetworks()
1943
1944    with alloc_fail_dbus(dev[0], 1,
1945                         "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network",
1946                         "RemoveNetwork", "InvalidArgs"):
1947        iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"))
1948
1949    with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"):
1950        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1951                               signature='sv')
1952        try:
1953            netw = iface.AddNetwork(args)
1954            # Currently, AddNetwork() succeeds even if os_strdup() for path
1955            # fails, so remove the network if that occurs.
1956            iface.RemoveNetwork(netw)
1957        except dbus.exceptions.DBusException as e:
1958            pass
1959
1960    for i in range(1, 3):
1961        with alloc_fail(dev[0], i, "=wpas_dbus_register_network"):
1962            try:
1963                netw = iface.AddNetwork(args)
1964                # Currently, AddNetwork() succeeds even if network registration
1965                # fails, so remove the network if that occurs.
1966                iface.RemoveNetwork(netw)
1967            except dbus.exceptions.DBusException as e:
1968                pass
1969
1970    with alloc_fail_dbus(dev[0], 1,
1971                         "=wpa_config_add_network;wpas_dbus_handler_add_network",
1972                         "AddNetwork",
1973                         "UnknownError: wpa_supplicant could not add a network"):
1974        args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'},
1975                               signature='sv')
1976        netw = iface.AddNetwork(args)
1977
1978    tests = [(1,
1979              'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network',
1980              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1981                              signature='sv')),
1982             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1983              dbus.Dictionary({'ssid': 'foo'}, signature='sv')),
1984             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1985              dbus.Dictionary({'eap': 'foo'}, signature='sv')),
1986             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1987              dbus.Dictionary({'priority': dbus.UInt32(1)},
1988                              signature='sv')),
1989             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1990              dbus.Dictionary({'priority': dbus.Int32(1)},
1991                              signature='sv')),
1992             (1, '=set_network_properties;wpas_dbus_handler_add_network',
1993              dbus.Dictionary({'ssid': dbus.ByteArray(b' ')},
1994                              signature='sv'))]
1995    for (count, funcs, args) in tests:
1996        with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"):
1997            netw = iface.AddNetwork(args)
1998
1999    if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks',
2000                      dbus_interface=dbus.PROPERTIES_IFACE)) > 0:
2001        raise Exception("Unexpected network block added")
2002    if len(dev[0].list_networks()) > 0:
2003        raise Exception("Unexpected network block visible")
2004
2005def test_dbus_interface(dev, apdev):
2006    """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases"""
2007    try:
2008        _test_dbus_interface(dev, apdev)
2009    finally:
2010        # Need to force P2P channel list update since the 'lo' interface
2011        # with driver=none ends up configuring default dualband channels.
2012        dev[0].request("SET country US")
2013        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2014        if ev is None:
2015            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2016                                          timeout=1)
2017        dev[0].request("SET country 00")
2018        ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2019        if ev is None:
2020            ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"],
2021                                          timeout=1)
2022        subprocess.call(['iw', 'reg', 'set', '00'])
2023
2024def _test_dbus_interface(dev, apdev):
2025    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2026    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2027
2028    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2029                             signature='sv')
2030    path = wpas.CreateInterface(params)
2031    logger.debug("New interface path: " + str(path))
2032    path2 = wpas.GetInterface("lo")
2033    if path != path2:
2034        raise Exception("Interface object mismatch")
2035
2036    params = dbus.Dictionary({'Ifname': 'lo',
2037                              'Driver': 'none',
2038                              'ConfigFile': 'foo',
2039                              'BridgeIfname': 'foo',},
2040                             signature='sv')
2041    try:
2042        wpas.CreateInterface(params)
2043        raise Exception("Invalid CreateInterface() accepted")
2044    except dbus.exceptions.DBusException as e:
2045        if "InterfaceExists" not in str(e):
2046            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2047
2048    wpas.RemoveInterface(path)
2049    try:
2050        wpas.RemoveInterface(path)
2051        raise Exception("Invalid RemoveInterface() accepted")
2052    except dbus.exceptions.DBusException as e:
2053        if "InterfaceUnknown" not in str(e):
2054            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2055
2056    params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none',
2057                              'Foo': 123},
2058                             signature='sv')
2059    try:
2060        wpas.CreateInterface(params)
2061        raise Exception("Invalid CreateInterface() accepted")
2062    except dbus.exceptions.DBusException as e:
2063        if "InvalidArgs" not in str(e):
2064            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2065
2066    params = dbus.Dictionary({'Driver': 'none'}, signature='sv')
2067    try:
2068        wpas.CreateInterface(params)
2069        raise Exception("Invalid CreateInterface() accepted")
2070    except dbus.exceptions.DBusException as e:
2071        if "InvalidArgs" not in str(e):
2072            raise Exception("Unexpected error message for invalid CreateInterface: " + str(e))
2073
2074    try:
2075        wpas.GetInterface("lo")
2076        raise Exception("Invalid GetInterface() accepted")
2077    except dbus.exceptions.DBusException as e:
2078        if "InterfaceUnknown" not in str(e):
2079            raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e))
2080
2081def test_dbus_interface_oom(dev, apdev):
2082    """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases"""
2083    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2084    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
2085
2086    with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"):
2087        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2088                                 signature='sv')
2089        wpas.CreateInterface(params)
2090
2091    for i in range(1, 1000):
2092        dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i)
2093        params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'},
2094                                 signature='sv')
2095        try:
2096            npath = wpas.CreateInterface(params)
2097            wpas.RemoveInterface(npath)
2098            logger.info("CreateInterface succeeds after %d allocation failures" % i)
2099            state = dev[0].request('GET_ALLOC_FAIL')
2100            logger.info("GET_ALLOC_FAIL: " + state)
2101            dev[0].dump_monitor()
2102            dev[0].request("TEST_ALLOC_FAIL 0:")
2103            if i < 5:
2104                raise Exception("CreateInterface succeeded during out-of-memory")
2105            if not state.startswith('0:'):
2106                break
2107        except dbus.exceptions.DBusException as e:
2108            pass
2109
2110    for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']:
2111        with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface",
2112                             "CreateInterface"):
2113            params = dbus.Dictionary({arg: 'foo'}, signature='sv')
2114            wpas.CreateInterface(params)
2115
2116def test_dbus_blob(dev, apdev):
2117    """D-Bus AddNetwork/RemoveNetwork parameters and error cases"""
2118    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2119    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2120
2121    blob = dbus.ByteArray(b"\x01\x02\x03")
2122    iface.AddBlob('blob1', blob)
2123    try:
2124        iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04"))
2125        raise Exception("Invalid AddBlob() accepted")
2126    except dbus.exceptions.DBusException as e:
2127        if "BlobExists" not in str(e):
2128            raise Exception("Unexpected error message for invalid AddBlob: " + str(e))
2129    res = iface.GetBlob('blob1')
2130    if len(res) != len(blob):
2131        raise Exception("Unexpected blob data length")
2132    for i in range(len(res)):
2133        if res[i] != dbus.Byte(blob[i]):
2134            raise Exception("Unexpected blob data")
2135    res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs",
2136                     dbus_interface=dbus.PROPERTIES_IFACE)
2137    if 'blob1' not in res:
2138        raise Exception("Added blob missing from Blobs property")
2139    iface.RemoveBlob('blob1')
2140    try:
2141        iface.RemoveBlob('blob1')
2142        raise Exception("Invalid RemoveBlob() accepted")
2143    except dbus.exceptions.DBusException as e:
2144        if "BlobUnknown" not in str(e):
2145            raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e))
2146    try:
2147        iface.GetBlob('blob1')
2148        raise Exception("Invalid GetBlob() accepted")
2149    except dbus.exceptions.DBusException as e:
2150        if "BlobUnknown" not in str(e):
2151            raise Exception("Unexpected error message for invalid GetBlob: " + str(e))
2152
2153    class TestDbusBlob(TestDbus):
2154        def __init__(self, bus):
2155            TestDbus.__init__(self, bus)
2156            self.blob_added = False
2157            self.blob_removed = False
2158
2159        def __enter__(self):
2160            gobject.timeout_add(1, self.run_blob)
2161            gobject.timeout_add(15000, self.timeout)
2162            self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded")
2163            self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved")
2164            self.loop.run()
2165            return self
2166
2167        def blobAdded(self, blobName):
2168            logger.debug("blobAdded: %s" % blobName)
2169            if blobName == 'blob2':
2170                self.blob_added = True
2171
2172        def blobRemoved(self, blobName):
2173            logger.debug("blobRemoved: %s" % blobName)
2174            if blobName == 'blob2':
2175                self.blob_removed = True
2176                self.loop.quit()
2177
2178        def run_blob(self, *args):
2179            logger.debug("run_blob")
2180            iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04"))
2181            iface.RemoveBlob('blob2')
2182            return False
2183
2184        def success(self):
2185            return self.blob_added and self.blob_removed
2186
2187    with TestDbusBlob(bus) as t:
2188        if not t.success():
2189            raise Exception("Expected signals not seen")
2190
2191def test_dbus_blob_oom(dev, apdev):
2192    """D-Bus AddNetwork/RemoveNetwork OOM error cases"""
2193    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2194    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2195
2196    for i in range(1, 4):
2197        with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob",
2198                             "AddBlob"):
2199            iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04"))
2200
2201def test_dbus_autoscan(dev, apdev):
2202    """D-Bus Autoscan()"""
2203    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2204    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2205
2206    iface.AutoScan("foo")
2207    iface.AutoScan("periodic:1")
2208    iface.AutoScan("")
2209    dev[0].request("AUTOSCAN ")
2210
2211def test_dbus_autoscan_oom(dev, apdev):
2212    """D-Bus Autoscan() OOM"""
2213    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2214    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2215
2216    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"):
2217        iface.AutoScan("foo")
2218    dev[0].request("AUTOSCAN ")
2219
2220def test_dbus_tdls_invalid(dev, apdev):
2221    """D-Bus invalid TDLS operations"""
2222    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2223    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2224
2225    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2226    connect_2sta_open(dev, hapd)
2227    addr1 = dev[1].p2p_interface_addr()
2228
2229    try:
2230        iface.TDLSDiscover("foo")
2231        raise Exception("Invalid TDLSDiscover() accepted")
2232    except dbus.exceptions.DBusException as e:
2233        if "InvalidArgs" not in str(e):
2234            raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e))
2235
2236    try:
2237        iface.TDLSStatus("foo")
2238        raise Exception("Invalid TDLSStatus() accepted")
2239    except dbus.exceptions.DBusException as e:
2240        if "InvalidArgs" not in str(e):
2241            raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e))
2242
2243    res = iface.TDLSStatus(addr1)
2244    if res != "peer does not exist":
2245        raise Exception("Unexpected TDLSStatus response")
2246
2247    try:
2248        iface.TDLSSetup("foo")
2249        raise Exception("Invalid TDLSSetup() accepted")
2250    except dbus.exceptions.DBusException as e:
2251        if "InvalidArgs" not in str(e):
2252            raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e))
2253
2254    try:
2255        iface.TDLSTeardown("foo")
2256        raise Exception("Invalid TDLSTeardown() accepted")
2257    except dbus.exceptions.DBusException as e:
2258        if "InvalidArgs" not in str(e):
2259            raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e))
2260
2261    try:
2262        iface.TDLSTeardown("00:11:22:33:44:55")
2263        raise Exception("TDLSTeardown accepted for unknown peer")
2264    except dbus.exceptions.DBusException as e:
2265        if "UnknownError: error performing TDLS teardown" not in str(e):
2266            raise Exception("Unexpected error message: " + str(e))
2267
2268    try:
2269        iface.TDLSChannelSwitch({})
2270        raise Exception("Invalid TDLSChannelSwitch() accepted")
2271    except dbus.exceptions.DBusException as e:
2272        if "InvalidArgs" not in str(e):
2273            raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e))
2274
2275    try:
2276        iface.TDLSCancelChannelSwitch("foo")
2277        raise Exception("Invalid TDLSCancelChannelSwitch() accepted")
2278    except dbus.exceptions.DBusException as e:
2279        if "InvalidArgs" not in str(e):
2280            raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e))
2281
2282def test_dbus_tdls_oom(dev, apdev):
2283    """D-Bus TDLS operations during OOM"""
2284    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2285    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2286
2287    with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup",
2288                         "UnknownError: error performing TDLS setup"):
2289        iface.TDLSSetup("00:11:22:33:44:55")
2290
2291def test_dbus_tdls(dev, apdev):
2292    """D-Bus TDLS"""
2293    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2294    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2295
2296    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2297    connect_2sta_open(dev, hapd)
2298
2299    addr1 = dev[1].p2p_interface_addr()
2300
2301    class TestDbusTdls(TestDbus):
2302        def __init__(self, bus):
2303            TestDbus.__init__(self, bus)
2304            self.tdls_setup = False
2305            self.tdls_teardown = False
2306
2307        def __enter__(self):
2308            gobject.timeout_add(1, self.run_tdls)
2309            gobject.timeout_add(15000, self.timeout)
2310            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2311                            "PropertiesChanged")
2312            self.loop.run()
2313            return self
2314
2315        def propertiesChanged(self, properties):
2316            logger.debug("propertiesChanged: %s" % str(properties))
2317
2318        def run_tdls(self, *args):
2319            logger.debug("run_tdls")
2320            iface.TDLSDiscover(addr1)
2321            gobject.timeout_add(100, self.run_tdls2)
2322            return False
2323
2324        def run_tdls2(self, *args):
2325            logger.debug("run_tdls2")
2326            iface.TDLSSetup(addr1)
2327            gobject.timeout_add(500, self.run_tdls3)
2328            return False
2329
2330        def run_tdls3(self, *args):
2331            logger.debug("run_tdls3")
2332            res = iface.TDLSStatus(addr1)
2333            if res == "connected":
2334                self.tdls_setup = True
2335            else:
2336                logger.info("Unexpected TDLSStatus: " + res)
2337            iface.TDLSTeardown(addr1)
2338            gobject.timeout_add(200, self.run_tdls4)
2339            return False
2340
2341        def run_tdls4(self, *args):
2342            logger.debug("run_tdls4")
2343            res = iface.TDLSStatus(addr1)
2344            if res == "peer does not exist":
2345                self.tdls_teardown = True
2346            else:
2347                logger.info("Unexpected TDLSStatus: " + res)
2348            self.loop.quit()
2349            return False
2350
2351        def success(self):
2352            return self.tdls_setup and self.tdls_teardown
2353
2354    with TestDbusTdls(bus) as t:
2355        if not t.success():
2356            raise Exception("Expected signals not seen")
2357
2358def test_dbus_tdls_channel_switch(dev, apdev):
2359    """D-Bus TDLS channel switch configuration"""
2360    flags = int(dev[0].get_driver_status_field('capa.flags'), 16)
2361    if flags & 0x800000000 == 0:
2362        raise HwsimSkip("Driver does not support TDLS channel switching")
2363
2364    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2365    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2366
2367    hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"})
2368    connect_2sta_open(dev, hapd)
2369
2370    addr1 = dev[1].p2p_interface_addr()
2371
2372    class TestDbusTdls(TestDbus):
2373        def __init__(self, bus):
2374            TestDbus.__init__(self, bus)
2375            self.tdls_setup = False
2376            self.tdls_done = False
2377
2378        def __enter__(self):
2379            gobject.timeout_add(1, self.run_tdls)
2380            gobject.timeout_add(15000, self.timeout)
2381            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
2382                            "PropertiesChanged")
2383            self.loop.run()
2384            return self
2385
2386        def propertiesChanged(self, properties):
2387            logger.debug("propertiesChanged: %s" % str(properties))
2388
2389        def run_tdls(self, *args):
2390            logger.debug("run_tdls")
2391            iface.TDLSDiscover(addr1)
2392            gobject.timeout_add(100, self.run_tdls2)
2393            return False
2394
2395        def run_tdls2(self, *args):
2396            logger.debug("run_tdls2")
2397            iface.TDLSSetup(addr1)
2398            gobject.timeout_add(500, self.run_tdls3)
2399            return False
2400
2401        def run_tdls3(self, *args):
2402            logger.debug("run_tdls3")
2403            res = iface.TDLSStatus(addr1)
2404            if res == "connected":
2405                self.tdls_setup = True
2406            else:
2407                logger.info("Unexpected TDLSStatus: " + res)
2408
2409            # Unknown dict entry
2410            args = dbus.Dictionary({'Foobar': dbus.Byte(1)},
2411                                   signature='sv')
2412            try:
2413                iface.TDLSChannelSwitch(args)
2414            except Exception as e:
2415                if "InvalidArgs" not in str(e):
2416                    raise Exception("Unexpected exception")
2417
2418            # Missing OperClass
2419            args = dbus.Dictionary({}, signature='sv')
2420            try:
2421                iface.TDLSChannelSwitch(args)
2422            except Exception as e:
2423                if "InvalidArgs" not in str(e):
2424                    raise Exception("Unexpected exception")
2425
2426            # Missing Frequency
2427            args = dbus.Dictionary({'OperClass': dbus.Byte(1)},
2428                                   signature='sv')
2429            try:
2430                iface.TDLSChannelSwitch(args)
2431            except Exception as e:
2432                if "InvalidArgs" not in str(e):
2433                    raise Exception("Unexpected exception")
2434
2435            # Missing PeerAddress
2436            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2437                                     'Frequency': dbus.UInt32(2417)},
2438                                   signature='sv')
2439            try:
2440                iface.TDLSChannelSwitch(args)
2441            except Exception as e:
2442                if "InvalidArgs" not in str(e):
2443                    raise Exception("Unexpected exception")
2444
2445            # Valid parameters
2446            args = dbus.Dictionary({'OperClass': dbus.Byte(1),
2447                                    'Frequency': dbus.UInt32(2417),
2448                                    'PeerAddress': addr1,
2449                                    'SecChannelOffset': dbus.UInt32(0),
2450                                    'CenterFrequency1': dbus.UInt32(0),
2451                                    'CenterFrequency2': dbus.UInt32(0),
2452                                    'Bandwidth': dbus.UInt32(20),
2453                                    'HT': dbus.Boolean(False),
2454                                    'VHT': dbus.Boolean(False)},
2455                                   signature='sv')
2456            iface.TDLSChannelSwitch(args)
2457
2458            gobject.timeout_add(200, self.run_tdls4)
2459            return False
2460
2461        def run_tdls4(self, *args):
2462            logger.debug("run_tdls4")
2463            iface.TDLSCancelChannelSwitch(addr1)
2464            self.tdls_done = True
2465            self.loop.quit()
2466            return False
2467
2468        def success(self):
2469            return self.tdls_setup and self.tdls_done
2470
2471    with TestDbusTdls(bus) as t:
2472        if not t.success():
2473            raise Exception("Expected signals not seen")
2474
2475def test_dbus_pkcs11(dev, apdev):
2476    """D-Bus SetPKCS11EngineAndModulePath()"""
2477    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2478    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2479
2480    try:
2481        iface.SetPKCS11EngineAndModulePath("foo", "bar")
2482    except dbus.exceptions.DBusException as e:
2483        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2484            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2485
2486    try:
2487        iface.SetPKCS11EngineAndModulePath("foo", "")
2488    except dbus.exceptions.DBusException as e:
2489        if "Error.Failed: Reinit of the EAPOL" not in str(e):
2490            raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e))
2491
2492    iface.SetPKCS11EngineAndModulePath("", "bar")
2493    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2494                     dbus_interface=dbus.PROPERTIES_IFACE)
2495    if res != "":
2496        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2497    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2498                     dbus_interface=dbus.PROPERTIES_IFACE)
2499    if res != "bar":
2500        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2501
2502    iface.SetPKCS11EngineAndModulePath("", "")
2503    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath",
2504                     dbus_interface=dbus.PROPERTIES_IFACE)
2505    if res != "":
2506        raise Exception("Unexpected PKCS11EnginePath value: " + res)
2507    res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath",
2508                     dbus_interface=dbus.PROPERTIES_IFACE)
2509    if res != "":
2510        raise Exception("Unexpected PKCS11ModulePath value: " + res)
2511
2512def test_dbus_apscan(dev, apdev):
2513    """D-Bus Get/Set ApScan"""
2514    try:
2515        _test_dbus_apscan(dev, apdev)
2516    finally:
2517        dev[0].request("AP_SCAN 1")
2518
2519def _test_dbus_apscan(dev, apdev):
2520    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2521
2522    res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2523                     dbus_interface=dbus.PROPERTIES_IFACE)
2524    if res != 1:
2525        raise Exception("Unexpected initial ApScan value: %d" % res)
2526
2527    for i in range(3):
2528        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i),
2529                     dbus_interface=dbus.PROPERTIES_IFACE)
2530        res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan",
2531                         dbus_interface=dbus.PROPERTIES_IFACE)
2532        if res != i:
2533            raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i))
2534
2535    try:
2536        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1),
2537                   dbus_interface=dbus.PROPERTIES_IFACE)
2538        raise Exception("Invalid Set(ApScan,-1) accepted")
2539    except dbus.exceptions.DBusException as e:
2540        if "Error.Failed: wrong property type" not in str(e):
2541            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2542
2543    try:
2544        if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123),
2545                   dbus_interface=dbus.PROPERTIES_IFACE)
2546        raise Exception("Invalid Set(ApScan,123) accepted")
2547    except dbus.exceptions.DBusException as e:
2548        if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e):
2549            raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e))
2550
2551    if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1),
2552               dbus_interface=dbus.PROPERTIES_IFACE)
2553
2554def test_dbus_pmf(dev, apdev):
2555    """D-Bus Get/Set Pmf"""
2556    try:
2557        _test_dbus_pmf(dev, apdev)
2558    finally:
2559        dev[0].request("SET pmf 0")
2560
2561def _test_dbus_pmf(dev, apdev):
2562    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2563
2564    dev[0].set("pmf", "0")
2565    res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2566                     dbus_interface=dbus.PROPERTIES_IFACE)
2567    if res != "0":
2568        raise Exception("Unexpected initial Pmf value: %s" % res)
2569
2570    for i in range(3):
2571        if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i),
2572                   dbus_interface=dbus.PROPERTIES_IFACE)
2573        res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf",
2574                         dbus_interface=dbus.PROPERTIES_IFACE)
2575        if res != str(i):
2576            raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i))
2577
2578    if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1",
2579               dbus_interface=dbus.PROPERTIES_IFACE)
2580
2581def test_dbus_fastreauth(dev, apdev):
2582    """D-Bus Get/Set FastReauth"""
2583    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2584
2585    res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2586                     dbus_interface=dbus.PROPERTIES_IFACE)
2587    if res != True:
2588        raise Exception("Unexpected initial FastReauth value: " + str(res))
2589
2590    for i in [False, True]:
2591        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i),
2592                     dbus_interface=dbus.PROPERTIES_IFACE)
2593        res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth",
2594                         dbus_interface=dbus.PROPERTIES_IFACE)
2595        if res != i:
2596            raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i))
2597
2598    try:
2599        if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1),
2600                   dbus_interface=dbus.PROPERTIES_IFACE)
2601        raise Exception("Invalid Set(FastReauth,-1) accepted")
2602    except dbus.exceptions.DBusException as e:
2603        if "Error.Failed: wrong property type" not in str(e):
2604            raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e))
2605
2606    if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True),
2607               dbus_interface=dbus.PROPERTIES_IFACE)
2608
2609def test_dbus_bss_expire(dev, apdev):
2610    """D-Bus Get/Set BSSExpireAge and BSSExpireCount"""
2611    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2612
2613    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179),
2614               dbus_interface=dbus.PROPERTIES_IFACE)
2615    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge",
2616                     dbus_interface=dbus.PROPERTIES_IFACE)
2617    if res != 179:
2618        raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i))
2619
2620    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3),
2621               dbus_interface=dbus.PROPERTIES_IFACE)
2622    res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount",
2623                     dbus_interface=dbus.PROPERTIES_IFACE)
2624    if res != 3:
2625        raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i))
2626
2627    try:
2628        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1),
2629                   dbus_interface=dbus.PROPERTIES_IFACE)
2630        raise Exception("Invalid Set(BSSExpireAge,-1) accepted")
2631    except dbus.exceptions.DBusException as e:
2632        if "Error.Failed: wrong property type" not in str(e):
2633            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e))
2634
2635    try:
2636        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9),
2637                   dbus_interface=dbus.PROPERTIES_IFACE)
2638        raise Exception("Invalid Set(BSSExpireAge,9) accepted")
2639    except dbus.exceptions.DBusException as e:
2640        if "Error.Failed: BSSExpireAge must be >= 10" not in str(e):
2641            raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e))
2642
2643    try:
2644        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1),
2645                   dbus_interface=dbus.PROPERTIES_IFACE)
2646        raise Exception("Invalid Set(BSSExpireCount,-1) accepted")
2647    except dbus.exceptions.DBusException as e:
2648        if "Error.Failed: wrong property type" not in str(e):
2649            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e))
2650
2651    try:
2652        if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0),
2653                   dbus_interface=dbus.PROPERTIES_IFACE)
2654        raise Exception("Invalid Set(BSSExpireCount,0) accepted")
2655    except dbus.exceptions.DBusException as e:
2656        if "Error.Failed: BSSExpireCount must be > 0" not in str(e):
2657            raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e))
2658
2659    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180),
2660               dbus_interface=dbus.PROPERTIES_IFACE)
2661    if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2),
2662               dbus_interface=dbus.PROPERTIES_IFACE)
2663
2664def test_dbus_country(dev, apdev):
2665    """D-Bus Get/Set Country"""
2666    try:
2667        _test_dbus_country(dev, apdev)
2668    finally:
2669        dev[0].request("SET country 00")
2670        subprocess.call(['iw', 'reg', 'set', '00'])
2671
2672def _test_dbus_country(dev, apdev):
2673    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2674
2675    # work around issues with possible pending regdom event from the end of
2676    # the previous test case
2677    time.sleep(0.2)
2678    dev[0].dump_monitor()
2679
2680    if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI",
2681               dbus_interface=dbus.PROPERTIES_IFACE)
2682    res = if_obj.Get(WPAS_DBUS_IFACE, "Country",
2683                     dbus_interface=dbus.PROPERTIES_IFACE)
2684    if res != "FI":
2685        raise Exception("Unexpected Country value %s (expected FI)" % res)
2686
2687    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2688    if ev is None:
2689        # For now, work around separate P2P Device interface event delivery
2690        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2691        if ev is None:
2692            raise Exception("regdom change event not seen")
2693    if "init=USER type=COUNTRY alpha2=FI" not in ev:
2694        raise Exception("Unexpected event contents: " + ev)
2695
2696    try:
2697        if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1),
2698                   dbus_interface=dbus.PROPERTIES_IFACE)
2699        raise Exception("Invalid Set(Country,-1) accepted")
2700    except dbus.exceptions.DBusException as e:
2701        if "Error.Failed: wrong property type" not in str(e):
2702            raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e))
2703
2704    try:
2705        if_obj.Set(WPAS_DBUS_IFACE, "Country", "F",
2706                   dbus_interface=dbus.PROPERTIES_IFACE)
2707        raise Exception("Invalid Set(Country,F) accepted")
2708    except dbus.exceptions.DBusException as e:
2709        if "Error.Failed: invalid country code" not in str(e):
2710            raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e))
2711
2712    if_obj.Set(WPAS_DBUS_IFACE, "Country", "00",
2713               dbus_interface=dbus.PROPERTIES_IFACE)
2714
2715    ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"])
2716    if ev is None:
2717        # For now, work around separate P2P Device interface event delivery
2718        ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1)
2719        if ev is None:
2720            raise Exception("regdom change event not seen")
2721    # init=CORE was previously used due to invalid db.txt data for 00. For
2722    # now, allow both it and the new init=USER after fixed db.txt.
2723    if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev:
2724        raise Exception("Unexpected event contents: " + ev)
2725
2726def test_dbus_scan_interval(dev, apdev):
2727    """D-Bus Get/Set ScanInterval"""
2728    try:
2729        _test_dbus_scan_interval(dev, apdev)
2730    finally:
2731        dev[0].request("SCAN_INTERVAL 5")
2732
2733def _test_dbus_scan_interval(dev, apdev):
2734    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2735
2736    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3),
2737               dbus_interface=dbus.PROPERTIES_IFACE)
2738    res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval",
2739                     dbus_interface=dbus.PROPERTIES_IFACE)
2740    if res != 3:
2741        raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i))
2742
2743    try:
2744        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100),
2745                   dbus_interface=dbus.PROPERTIES_IFACE)
2746        raise Exception("Invalid Set(ScanInterval,100) accepted")
2747    except dbus.exceptions.DBusException as e:
2748        if "Error.Failed: wrong property type" not in str(e):
2749            raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e))
2750
2751    try:
2752        if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1),
2753                   dbus_interface=dbus.PROPERTIES_IFACE)
2754        raise Exception("Invalid Set(ScanInterval,-1) accepted")
2755    except dbus.exceptions.DBusException as e:
2756        if "Error.Failed: scan_interval must be >= 0" not in str(e):
2757            raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e))
2758
2759    if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5),
2760               dbus_interface=dbus.PROPERTIES_IFACE)
2761
2762def test_dbus_probe_req_reporting(dev, apdev):
2763    """D-Bus Probe Request reporting"""
2764    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2765
2766    dev[1].p2p_find(social=True)
2767
2768    class TestDbusProbe(TestDbus):
2769        def __init__(self, bus):
2770            TestDbus.__init__(self, bus)
2771            self.reported = False
2772
2773        def __enter__(self):
2774            gobject.timeout_add(1, self.run_test)
2775            gobject.timeout_add(15000, self.timeout)
2776            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
2777                            "GroupStarted")
2778            self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest",
2779                            byte_arrays=True)
2780            self.loop.run()
2781            return self
2782
2783        def groupStarted(self, properties):
2784            logger.debug("groupStarted: " + str(properties))
2785            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
2786                                      properties['interface_object'])
2787            self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE)
2788            self.iface.SubscribeProbeReq()
2789            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2790
2791        def probeRequest(self, args):
2792            logger.debug("probeRequest: args=%s" % str(args))
2793            self.reported = True
2794            self.loop.quit()
2795
2796        def run_test(self, *args):
2797            logger.debug("run_test")
2798            p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2799            params = dbus.Dictionary({'frequency': 2412})
2800            p2p.GroupAdd(params)
2801            return False
2802
2803        def success(self):
2804            return self.reported
2805
2806    with TestDbusProbe(bus) as t:
2807        if not t.success():
2808            raise Exception("Expected signals not seen")
2809        t.iface.UnsubscribeProbeReq()
2810        try:
2811            t.iface.UnsubscribeProbeReq()
2812            raise Exception("Invalid UnsubscribeProbeReq() accepted")
2813        except dbus.exceptions.DBusException as e:
2814            if "NoSubscription" not in str(e):
2815                raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e))
2816        t.group_p2p.Disconnect()
2817
2818    with TestDbusProbe(bus) as t:
2819        if not t.success():
2820            raise Exception("Expected signals not seen")
2821        # On purpose, leave ProbeReq subscription in place to test automatic
2822        # cleanup.
2823
2824    dev[1].p2p_stop_find()
2825
2826def test_dbus_probe_req_reporting_oom(dev, apdev):
2827    """D-Bus Probe Request reporting (OOM)"""
2828    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2829    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
2830
2831    # Need to make sure this process has not already subscribed to avoid false
2832    # failures due to the operation succeeding due to os_strdup() not even
2833    # getting called.
2834    try:
2835        iface.UnsubscribeProbeReq()
2836        was_subscribed = True
2837    except dbus.exceptions.DBusException as e:
2838        was_subscribed = False
2839        pass
2840
2841    with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq",
2842                         "SubscribeProbeReq"):
2843        iface.SubscribeProbeReq()
2844
2845    if was_subscribed:
2846        # On purpose, leave ProbeReq subscription in place to test automatic
2847        # cleanup.
2848        iface.SubscribeProbeReq()
2849
2850def test_dbus_p2p_invalid(dev, apdev):
2851    """D-Bus invalid P2P operations"""
2852    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
2853    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2854
2855    try:
2856        p2p.RejectPeer(path + "/Peers/00112233445566")
2857        raise Exception("Invalid RejectPeer accepted")
2858    except dbus.exceptions.DBusException as e:
2859        if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
2860            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2861
2862    try:
2863        p2p.RejectPeer("/foo")
2864        raise Exception("Invalid RejectPeer accepted")
2865    except dbus.exceptions.DBusException as e:
2866        if "InvalidArgs" not in str(e):
2867            raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
2868
2869    tests = [{},
2870             {'peer': 'foo'},
2871             {'foo': "bar"},
2872             {'iface': "abc"},
2873             {'iface': 123}]
2874    for t in tests:
2875        try:
2876            p2p.RemoveClient(t)
2877            raise Exception("Invalid RemoveClient accepted")
2878        except dbus.exceptions.DBusException as e:
2879            if "InvalidArgs" not in str(e):
2880                raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e))
2881
2882    tests = [{'DiscoveryType': 'foo'},
2883             {'RequestedDeviceTypes': 'foo'},
2884             {'RequestedDeviceTypes': ['foo']},
2885             {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8',
2886                                       '9', '10', '11', '12', '13', '14', '15',
2887                                       '16', '17']},
2888             {'RequestedDeviceTypes': dbus.Array([], signature="s")},
2889             {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")},
2890             {'RequestedDeviceTypes': dbus.Array([], signature="i")},
2891             {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'),
2892                                       dbus.ByteArray(b'1234567')]},
2893             {'Foo': dbus.Int16(1)},
2894             {'Foo': dbus.UInt16(1)},
2895             {'Foo': dbus.Int64(1)},
2896             {'Foo': dbus.UInt64(1)},
2897             {'Foo': dbus.Double(1.23)},
2898             {'Foo': dbus.Signature('s')},
2899             {'Foo': 'bar'}]
2900    for t in tests:
2901        try:
2902            p2p.Find(dbus.Dictionary(t))
2903            raise Exception("Invalid Find accepted")
2904        except dbus.exceptions.DBusException as e:
2905            if "InvalidArgs" not in str(e):
2906                raise Exception("Unexpected error message for invalid Find(): " + str(e))
2907
2908    for p in ["/foo",
2909              "/fi/w1/wpa_supplicant1/Interfaces/1234",
2910              "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]:
2911        try:
2912            p2p.RemovePersistentGroup(dbus.ObjectPath(p))
2913            raise Exception("Invalid RemovePersistentGroup accepted")
2914        except dbus.exceptions.DBusException as e:
2915            if "InvalidArgs" not in str(e):
2916                raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
2917
2918    try:
2919        dev[0].request("P2P_SET disabled 1")
2920        p2p.Listen(5)
2921        raise Exception("Invalid Listen accepted")
2922    except dbus.exceptions.DBusException as e:
2923        if "UnknownError: Could not start P2P listen" not in str(e):
2924            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2925    finally:
2926        dev[0].request("P2P_SET disabled 0")
2927
2928    test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False)
2929    test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE)
2930    try:
2931        test_p2p.Listen("foo")
2932        raise Exception("Invalid Listen accepted")
2933    except dbus.exceptions.DBusException as e:
2934        if "InvalidArgs" not in str(e):
2935            raise Exception("Unexpected error message for invalid Listen: " + str(e))
2936
2937    try:
2938        dev[0].request("P2P_SET disabled 1")
2939        p2p.ExtendedListen(dbus.Dictionary({}))
2940        raise Exception("Invalid ExtendedListen accepted")
2941    except dbus.exceptions.DBusException as e:
2942        if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e):
2943            raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e))
2944    finally:
2945        dev[0].request("P2P_SET disabled 0")
2946
2947    try:
2948        dev[0].request("P2P_SET disabled 1")
2949        args = {'duration1': 30000, 'interval1': 102400,
2950                'duration2': 20000, 'interval2': 102400}
2951        p2p.PresenceRequest(args)
2952        raise Exception("Invalid PresenceRequest accepted")
2953    except dbus.exceptions.DBusException as e:
2954        if "UnknownError: Failed to invoke presence request" not in str(e):
2955            raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e))
2956    finally:
2957        dev[0].request("P2P_SET disabled 0")
2958
2959    try:
2960        params = dbus.Dictionary({'frequency': dbus.Int32(-1)})
2961        p2p.GroupAdd(params)
2962        raise Exception("Invalid GroupAdd accepted")
2963    except dbus.exceptions.DBusException as e:
2964        if "InvalidArgs" not in str(e):
2965            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2966
2967    try:
2968        params = dbus.Dictionary({'persistent_group_object':
2969                                  dbus.ObjectPath(path),
2970                                  'frequency': 2412})
2971        p2p.GroupAdd(params)
2972        raise Exception("Invalid GroupAdd accepted")
2973    except dbus.exceptions.DBusException as e:
2974        if "InvalidArgs" not in str(e):
2975            raise Exception("Unexpected error message for invalid GroupAdd: " + str(e))
2976
2977    try:
2978        p2p.Disconnect()
2979        raise Exception("Invalid Disconnect accepted")
2980    except dbus.exceptions.DBusException as e:
2981        if "UnknownError: failed to disconnect" not in str(e):
2982            raise Exception("Unexpected error message for invalid Disconnect: " + str(e))
2983
2984    try:
2985        dev[0].request("P2P_SET disabled 1")
2986        p2p.Flush()
2987        raise Exception("Invalid Flush accepted")
2988    except dbus.exceptions.DBusException as e:
2989        if "Error.Failed: P2P is not available for this interface" not in str(e):
2990            raise Exception("Unexpected error message for invalid Flush: " + str(e))
2991    finally:
2992        dev[0].request("P2P_SET disabled 0")
2993
2994    try:
2995        dev[0].request("P2P_SET disabled 1")
2996        args = {'peer': path,
2997                'join': True,
2998                'wps_method': 'pbc',
2999                'frequency': 2412}
3000        pin = p2p.Connect(args)
3001        raise Exception("Invalid Connect accepted")
3002    except dbus.exceptions.DBusException as e:
3003        if "Error.Failed: P2P is not available for this interface" not in str(e):
3004            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3005    finally:
3006        dev[0].request("P2P_SET disabled 0")
3007
3008    tests = [{'frequency': dbus.Int32(-1)},
3009             {'wps_method': 'pbc'},
3010             {'wps_method': 'foo'}]
3011    for args in tests:
3012        try:
3013            pin = p2p.Connect(args)
3014            raise Exception("Invalid Connect accepted")
3015        except dbus.exceptions.DBusException as e:
3016            if "InvalidArgs" not in str(e):
3017                raise Exception("Unexpected error message for invalid Connect: " + str(e))
3018
3019    try:
3020        dev[0].request("P2P_SET disabled 1")
3021        args = {'peer': path}
3022        pin = p2p.Invite(args)
3023        raise Exception("Invalid Invite accepted")
3024    except dbus.exceptions.DBusException as e:
3025        if "Error.Failed: P2P is not available for this interface" not in str(e):
3026            raise Exception("Unexpected error message for invalid Invite: " + str(e))
3027    finally:
3028        dev[0].request("P2P_SET disabled 0")
3029
3030    try:
3031        args = {'foo': 'bar'}
3032        pin = p2p.Invite(args)
3033        raise Exception("Invalid Invite accepted")
3034    except dbus.exceptions.DBusException as e:
3035        if "InvalidArgs" not in str(e):
3036            raise Exception("Unexpected error message for invalid Connect: " + str(e))
3037
3038    tests = [(path, 'display', "InvalidArgs"),
3039             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3040              'display',
3041              "UnknownError: Failed to send provision discovery request"),
3042             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3043              'keypad',
3044              "UnknownError: Failed to send provision discovery request"),
3045             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3046              'pbc',
3047              "UnknownError: Failed to send provision discovery request"),
3048             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3049              'pushbutton',
3050              "UnknownError: Failed to send provision discovery request"),
3051             (dbus.ObjectPath(path + "/Peers/00112233445566"),
3052              'foo', "InvalidArgs")]
3053    for (p, method, err) in tests:
3054        try:
3055            p2p.ProvisionDiscoveryRequest(p, method)
3056            raise Exception("Invalid ProvisionDiscoveryRequest accepted")
3057        except dbus.exceptions.DBusException as e:
3058            if err not in str(e):
3059                raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e))
3060
3061    try:
3062        dev[0].request("P2P_SET disabled 1")
3063        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3064                   dbus_interface=dbus.PROPERTIES_IFACE)
3065        raise Exception("Invalid Get(Peers) accepted")
3066    except dbus.exceptions.DBusException as e:
3067        if "Error.Failed: P2P is not available for this interface" not in str(e):
3068            raise Exception("Unexpected error message for invalid Get(Peers): " + str(e))
3069    finally:
3070        dev[0].request("P2P_SET disabled 0")
3071
3072def test_dbus_p2p_oom(dev, apdev):
3073    """D-Bus P2P operations and OOM"""
3074    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3075    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3076
3077    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array",
3078                         "Find", "InvalidArgs"):
3079        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3080
3081    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array",
3082                         "Find", "InvalidArgs"):
3083        p2p.Find(dbus.Dictionary({'Foo': ['bar']}))
3084
3085    with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array",
3086                         "Find", "InvalidArgs"):
3087        p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7',
3088                                          '8', '9']}))
3089
3090    with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray",
3091                         "Find", "InvalidArgs"):
3092        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3093
3094    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray",
3095                         "Find", "InvalidArgs"):
3096        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3097
3098    with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray",
3099                         "Find", "InvalidArgs"):
3100        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'),
3101                                          dbus.ByteArray(b'123'),
3102                                          dbus.ByteArray(b'123'),
3103                                          dbus.ByteArray(b'123'),
3104                                          dbus.ByteArray(b'123'),
3105                                          dbus.ByteArray(b'123'),
3106                                          dbus.ByteArray(b'123'),
3107                                          dbus.ByteArray(b'123'),
3108                                          dbus.ByteArray(b'123'),
3109                                          dbus.ByteArray(b'123'),
3110                                          dbus.ByteArray(b'123')]}))
3111
3112    with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray",
3113                         "Find", "InvalidArgs"):
3114        p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]}))
3115
3116    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find",
3117                         "Find", "InvalidArgs"):
3118        p2p.Find(dbus.Dictionary({'Foo': path}))
3119
3120    with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array",
3121                         "AddService", "InvalidArgs"):
3122        args = {'service_type': 'bonjour',
3123                'response': dbus.ByteArray(500*b'b')}
3124        p2p.AddService(args)
3125
3126    with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array",
3127                         "AddService", "InvalidArgs"):
3128        p2p.AddService(args)
3129
3130def test_dbus_p2p_discovery(dev, apdev):
3131    """D-Bus P2P discovery"""
3132    try:
3133        run_dbus_p2p_discovery(dev, apdev)
3134    finally:
3135        dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3136
3137def run_dbus_p2p_discovery(dev, apdev):
3138    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3139    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3140
3141    addr0 = dev[0].p2p_dev_addr()
3142
3143    dev[1].request("SET sec_device_type 1-0050F204-2")
3144    dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344")
3145    dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566")
3146    dev[1].p2p_listen()
3147    addr1 = dev[1].p2p_dev_addr()
3148    a1 = binascii.unhexlify(addr1.replace(':', ''))
3149
3150    wfd_devinfo = "00001c440028"
3151    dev[2].request("SET wifi_display 1")
3152    dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo)
3153    wfd = binascii.unhexlify('000006' + wfd_devinfo)
3154    dev[2].p2p_listen()
3155    addr2 = dev[2].p2p_dev_addr()
3156    a2 = binascii.unhexlify(addr2.replace(':', ''))
3157
3158    res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
3159                        dbus_interface=dbus.PROPERTIES_IFACE)
3160    if 'Peers' not in res:
3161        raise Exception("GetAll result missing Peers")
3162    if len(res['Peers']) != 0:
3163        raise Exception("Unexpected peer(s) in the list")
3164
3165    args = {'DiscoveryType': 'social',
3166            'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')],
3167            'Timeout': dbus.Int32(1)}
3168    p2p.Find(dbus.Dictionary(args))
3169    p2p.StopFind()
3170
3171    class TestDbusP2p(TestDbus):
3172        def __init__(self, bus):
3173            TestDbus.__init__(self, bus)
3174            self.found = False
3175            self.found2 = False
3176            self.found_prop = False
3177            self.lost = False
3178            self.find_stopped = False
3179
3180        def __enter__(self):
3181            gobject.timeout_add(1, self.run_test)
3182            gobject.timeout_add(15000, self.timeout)
3183            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3184                            "DeviceFound")
3185            self.add_signal(self.deviceFoundProperties,
3186                            WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties")
3187            self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE,
3188                            "DeviceLost")
3189            self.add_signal(self.provisionDiscoveryResponseEnterPin,
3190                            WPAS_DBUS_IFACE_P2PDEVICE,
3191                            "ProvisionDiscoveryResponseEnterPin")
3192            self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE,
3193                            "FindStopped")
3194            self.loop.run()
3195            return self
3196
3197        def deviceFound(self, path):
3198            logger.debug("deviceFound: path=%s" % path)
3199            res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers",
3200                             dbus_interface=dbus.PROPERTIES_IFACE)
3201            if len(res) < 1:
3202                raise Exception("Unexpected number of peers")
3203            if path not in res:
3204                raise Exception("Mismatch in peer object path")
3205            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3206            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3207                                  dbus_interface=dbus.PROPERTIES_IFACE,
3208                                  byte_arrays=True)
3209            logger.debug("peer properties: " + str(res))
3210
3211            if res['DeviceAddress'] == a1:
3212                if 'SecondaryDeviceTypes' not in res:
3213                    raise Exception("Missing SecondaryDeviceTypes")
3214                sec = res['SecondaryDeviceTypes']
3215                if len(sec) < 1:
3216                    raise Exception("Secondary device type missing")
3217                if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec:
3218                    raise Exception("Secondary device type mismatch")
3219
3220                if 'VendorExtension' not in res:
3221                    raise Exception("Missing VendorExtension")
3222                vendor = res['VendorExtension']
3223                if len(vendor) < 1:
3224                    raise Exception("Vendor extension missing")
3225                if b"\x11\x22\x33\x44" not in vendor:
3226                    raise Exception("Secondary device type mismatch")
3227
3228                if 'VSIE' not in res:
3229                    raise Exception("Missing VSIE")
3230                vendor = res['VSIE']
3231                if len(vendor) < 1:
3232                    raise Exception("VSIE missing")
3233                if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66":
3234                    raise Exception("VSIE mismatch")
3235
3236                self.found = True
3237            elif res['DeviceAddress'] == a2:
3238                if 'IEs' not in res:
3239                    raise Exception("IEs missing")
3240                if res['IEs'] != wfd:
3241                    raise Exception("IEs mismatch")
3242                self.found2 = True
3243            else:
3244                raise Exception("Unexpected peer device address")
3245
3246            if self.found and self.found2:
3247                p2p.StopFind()
3248                p2p.RejectPeer(path)
3249                p2p.ProvisionDiscoveryRequest(path, 'display')
3250
3251        def deviceLost(self, path):
3252            logger.debug("deviceLost: path=%s" % path)
3253            if not self.found or not self.found2:
3254                # This may happen if a previous test case ended up scheduling
3255                # deviceLost event and that event did not get delivered before
3256                # starting the next test execution.
3257                logger.debug("Ignore deviceLost before the deviceFound events")
3258                return
3259            self.lost = True
3260            try:
3261                p2p.RejectPeer(path)
3262                raise Exception("Invalid RejectPeer accepted")
3263            except dbus.exceptions.DBusException as e:
3264                if "UnknownError: Failed to call wpas_p2p_reject" not in str(e):
3265                    raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e))
3266            self.loop.quit()
3267
3268        def deviceFoundProperties(self, path, properties):
3269            logger.debug("deviceFoundProperties: path=%s" % path)
3270            logger.debug("peer properties: " + str(properties))
3271            if properties['DeviceAddress'] == a1:
3272                self.found_prop = True
3273
3274        def provisionDiscoveryResponseEnterPin(self, peer_object):
3275            logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object)
3276            p2p.Flush()
3277
3278        def findStopped(self):
3279            logger.debug("findStopped")
3280            self.find_stopped = True
3281
3282        def run_test(self, *args):
3283            logger.debug("run_test")
3284            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3285                                      'Timeout': dbus.Int32(10)}))
3286            return False
3287
3288        def success(self):
3289            return self.found and self.lost and self.found2 and self.find_stopped
3290
3291    with TestDbusP2p(bus) as t:
3292        if not t.success():
3293            raise Exception("Expected signals not seen")
3294
3295    dev[1].request("VENDOR_ELEM_REMOVE 1 *")
3296    dev[1].p2p_stop_find()
3297
3298    p2p.Listen(1)
3299    dev[2].p2p_stop_find()
3300    dev[2].request("P2P_FLUSH")
3301    if not dev[2].discover_peer(addr0):
3302        raise Exception("Peer not found")
3303    p2p.StopFind()
3304    dev[2].p2p_stop_find()
3305
3306    try:
3307        p2p.ExtendedListen(dbus.Dictionary({'foo': 100}))
3308        raise Exception("Invalid ExtendedListen accepted")
3309    except dbus.exceptions.DBusException as e:
3310        if "InvalidArgs" not in str(e):
3311            raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e))
3312
3313    p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000}))
3314    p2p.ExtendedListen(dbus.Dictionary({}))
3315    dev[0].global_request("P2P_EXT_LISTEN")
3316
3317def test_dbus_p2p_discovery_freq(dev, apdev):
3318    """D-Bus P2P discovery on a specific non-social channel"""
3319    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3320    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3321
3322    addr1 = dev[1].p2p_dev_addr()
3323    autogo(dev[1], freq=2422)
3324
3325    class TestDbusP2p(TestDbus):
3326        def __init__(self, bus):
3327            TestDbus.__init__(self, bus)
3328            self.found = False
3329
3330        def __enter__(self):
3331            gobject.timeout_add(1, self.run_test)
3332            gobject.timeout_add(5000, self.timeout)
3333            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3334                            "DeviceFound")
3335            self.loop.run()
3336            return self
3337
3338        def deviceFound(self, path):
3339            logger.debug("deviceFound: path=%s" % path)
3340            self.found = True
3341            self.loop.quit()
3342
3343        def run_test(self, *args):
3344            logger.debug("run_test")
3345            p2p.Find(dbus.Dictionary({'freq': 2422}))
3346            return False
3347
3348        def success(self):
3349            return self.found
3350
3351    with TestDbusP2p(bus) as t:
3352        if not t.success():
3353            raise Exception("Expected signals not seen")
3354
3355    dev[1].remove_group()
3356    p2p.StopFind()
3357
3358def test_dbus_p2p_service_discovery(dev, apdev):
3359    """D-Bus P2P service discovery"""
3360    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3361    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3362
3363    addr0 = dev[0].p2p_dev_addr()
3364    addr1 = dev[1].p2p_dev_addr()
3365
3366    bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01'))
3367    bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027'))
3368
3369    args = {'service_type': 'bonjour',
3370            'query': bonjour_query,
3371            'response': bonjour_response}
3372    p2p.AddService(args)
3373    p2p.FlushService()
3374    p2p.AddService(args)
3375
3376    try:
3377        p2p.DeleteService(args)
3378        raise Exception("Invalid DeleteService() accepted")
3379    except dbus.exceptions.DBusException as e:
3380        if "InvalidArgs" not in str(e):
3381            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3382
3383    args = {'service_type': 'bonjour',
3384            'query': bonjour_query}
3385    p2p.DeleteService(args)
3386    try:
3387        p2p.DeleteService(args)
3388        raise Exception("Invalid DeleteService() accepted")
3389    except dbus.exceptions.DBusException as e:
3390        if "InvalidArgs" not in str(e):
3391            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3392
3393    args = {'service_type': 'upnp',
3394            'version': 0x10,
3395            'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}
3396    p2p.AddService(args)
3397    p2p.DeleteService(args)
3398    try:
3399        p2p.DeleteService(args)
3400        raise Exception("Invalid DeleteService() accepted")
3401    except dbus.exceptions.DBusException as e:
3402        if "InvalidArgs" not in str(e):
3403            raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3404
3405    tests = [{'service_type': 'foo'},
3406             {'service_type': 'foo', 'query': bonjour_query},
3407             {'service_type': 'upnp'},
3408             {'service_type': 'upnp', 'version': 0x10},
3409             {'service_type': 'upnp',
3410              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3411             {'version': 0x10,
3412              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3413             {'service_type': 'upnp', 'foo': 'bar'},
3414             {'service_type': 'bonjour'},
3415             {'service_type': 'bonjour', 'query': 'foo'},
3416             {'service_type': 'bonjour', 'foo': 'bar'}]
3417    for args in tests:
3418        try:
3419            p2p.DeleteService(args)
3420            raise Exception("Invalid DeleteService() accepted")
3421        except dbus.exceptions.DBusException as e:
3422            if "InvalidArgs" not in str(e):
3423                raise Exception("Unexpected error message for invalid DeleteService(): " + str(e))
3424
3425    tests = [{'service_type': 'foo'},
3426             {'service_type': 'upnp'},
3427             {'service_type': 'upnp', 'version': 0x10},
3428             {'service_type': 'upnp',
3429              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3430             {'version': 0x10,
3431              'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'},
3432             {'service_type': 'upnp', 'foo': 'bar'},
3433             {'service_type': 'bonjour'},
3434             {'service_type': 'bonjour', 'query': 'foo'},
3435             {'service_type': 'bonjour', 'response': 'foo'},
3436             {'service_type': 'bonjour', 'query': bonjour_query},
3437             {'service_type': 'bonjour', 'response': bonjour_response},
3438             {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')},
3439             {'service_type': 'bonjour', 'foo': 'bar'}]
3440    for args in tests:
3441        try:
3442            p2p.AddService(args)
3443            raise Exception("Invalid AddService() accepted")
3444        except dbus.exceptions.DBusException as e:
3445            if "InvalidArgs" not in str(e):
3446                raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3447
3448    args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3449    ref = p2p.ServiceDiscoveryRequest(args)
3450    p2p.ServiceDiscoveryCancelRequest(ref)
3451    try:
3452        p2p.ServiceDiscoveryCancelRequest(ref)
3453        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3454    except dbus.exceptions.DBusException as e:
3455        if "InvalidArgs" not in str(e):
3456            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3457    try:
3458        p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0))
3459        raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted")
3460    except dbus.exceptions.DBusException as e:
3461        if "InvalidArgs" not in str(e):
3462            raise Exception("Unexpected error message for invalid AddService(): " + str(e))
3463
3464    args = {'service_type': 'upnp',
3465            'version': 0x10,
3466            'service': 'ssdp:foo'}
3467    ref = p2p.ServiceDiscoveryRequest(args)
3468    p2p.ServiceDiscoveryCancelRequest(ref)
3469
3470    tests = [{'service_type': 'foo'},
3471             {'foo': 'bar'},
3472             {'tlv': 'foo'},
3473             {},
3474             {'version': 0},
3475             {'service_type': 'upnp',
3476              'service': 'ssdp:foo'},
3477             {'service_type': 'upnp',
3478              'version': 0x10},
3479             {'service_type': 'upnp',
3480              'version': 0x10,
3481              'service': 'ssdp:foo',
3482              'peer_object': dbus.ObjectPath(path + "/Peers")},
3483             {'service_type': 'upnp',
3484              'version': 0x10,
3485              'service': 'ssdp:foo',
3486              'peer_object': path + "/Peers"},
3487             {'service_type': 'upnp',
3488              'version': 0x10,
3489              'service': 'ssdp:foo',
3490              'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}]
3491    for args in tests:
3492        try:
3493            p2p.ServiceDiscoveryRequest(args)
3494            raise Exception("Invalid ServiceDiscoveryRequest accepted")
3495        except dbus.exceptions.DBusException as e:
3496            if "InvalidArgs" not in str(e):
3497                raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e))
3498
3499    args = {'foo': 'bar'}
3500    try:
3501        p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3502        raise Exception("Invalid ServiceDiscoveryResponse accepted")
3503    except dbus.exceptions.DBusException as e:
3504        if "InvalidArgs" not in str(e):
3505            raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e))
3506
3507def test_dbus_p2p_service_discovery_query(dev, apdev):
3508    """D-Bus P2P service discovery query"""
3509    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3510    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3511
3512    addr0 = dev[0].p2p_dev_addr()
3513    dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027")
3514    dev[1].p2p_listen()
3515    addr1 = dev[1].p2p_dev_addr()
3516
3517    class TestDbusP2p(TestDbus):
3518        def __init__(self, bus):
3519            TestDbus.__init__(self, bus)
3520            self.done = False
3521
3522        def __enter__(self):
3523            gobject.timeout_add(1, self.run_test)
3524            gobject.timeout_add(15000, self.timeout)
3525            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3526                            "DeviceFound")
3527            self.add_signal(self.serviceDiscoveryResponse,
3528                            WPAS_DBUS_IFACE_P2PDEVICE,
3529                            "ServiceDiscoveryResponse", byte_arrays=True)
3530            self.loop.run()
3531            return self
3532
3533        def deviceFound(self, path):
3534            logger.debug("deviceFound: path=%s" % path)
3535            args = {'peer_object': path,
3536                    'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")}
3537            p2p.ServiceDiscoveryRequest(args)
3538
3539        def serviceDiscoveryResponse(self, sd_request):
3540            logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request))
3541            self.done = True
3542            self.loop.quit()
3543
3544        def run_test(self, *args):
3545            logger.debug("run_test")
3546            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social',
3547                                      'Timeout': dbus.Int32(10)}))
3548            return False
3549
3550        def success(self):
3551            return self.done
3552
3553    with TestDbusP2p(bus) as t:
3554        if not t.success():
3555            raise Exception("Expected signals not seen")
3556
3557    dev[1].p2p_stop_find()
3558
3559def test_dbus_p2p_service_discovery_external(dev, apdev):
3560    """D-Bus P2P service discovery with external response"""
3561    try:
3562        _test_dbus_p2p_service_discovery_external(dev, apdev)
3563    finally:
3564        dev[0].request("P2P_SERV_DISC_EXTERNAL 0")
3565
3566def _test_dbus_p2p_service_discovery_external(dev, apdev):
3567    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3568    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3569
3570    addr0 = dev[0].p2p_dev_addr()
3571    addr1 = dev[1].p2p_dev_addr()
3572    resp = "0300000101"
3573
3574    dev[1].request("P2P_FLUSH")
3575    dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001")
3576    dev[1].p2p_find(social=True)
3577
3578    class TestDbusP2p(TestDbus):
3579        def __init__(self, bus):
3580            TestDbus.__init__(self, bus)
3581            self.sd = False
3582
3583        def __enter__(self):
3584            gobject.timeout_add(1, self.run_test)
3585            gobject.timeout_add(15000, self.timeout)
3586            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3587                            "DeviceFound")
3588            self.add_signal(self.serviceDiscoveryRequest,
3589                            WPAS_DBUS_IFACE_P2PDEVICE,
3590                            "ServiceDiscoveryRequest")
3591            self.loop.run()
3592            return self
3593
3594        def deviceFound(self, path):
3595            logger.debug("deviceFound: path=%s" % path)
3596
3597        def serviceDiscoveryRequest(self, sd_request):
3598            logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request))
3599            self.sd = True
3600            args = {'peer_object': sd_request['peer_object'],
3601                    'frequency': sd_request['frequency'],
3602                    'dialog_token': sd_request['dialog_token'],
3603                    'tlvs': dbus.ByteArray(binascii.unhexlify(resp))}
3604            p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv'))
3605            self.loop.quit()
3606
3607        def run_test(self, *args):
3608            logger.debug("run_test")
3609            p2p.ServiceDiscoveryExternal(1)
3610            p2p.ServiceUpdate()
3611            p2p.Listen(15)
3612            return False
3613
3614        def success(self):
3615            return self.sd
3616
3617    with TestDbusP2p(bus) as t:
3618        if not t.success():
3619            raise Exception("Expected signals not seen")
3620
3621    ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5)
3622    if ev is None:
3623        raise Exception("Service discovery timed out")
3624    if addr0 not in ev:
3625        raise Exception("Unexpected address in SD Response: " + ev)
3626    if ev.split(' ')[4] != resp:
3627        raise Exception("Unexpected response data SD Response: " + ev)
3628    dev[1].p2p_stop_find()
3629
3630    p2p.StopFind()
3631    p2p.ServiceDiscoveryExternal(0)
3632
3633def test_dbus_p2p_autogo(dev, apdev):
3634    """D-Bus P2P autonomous GO"""
3635    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3636    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3637
3638    addr0 = dev[0].p2p_dev_addr()
3639
3640    class TestDbusP2p(TestDbus):
3641        def __init__(self, bus):
3642            TestDbus.__init__(self, bus)
3643            self.first = True
3644            self.waiting_end = False
3645            self.exceptions = False
3646            self.deauthorized = False
3647            self.done = False
3648
3649        def __enter__(self):
3650            gobject.timeout_add(1, self.run_test)
3651            gobject.timeout_add(15000, self.timeout)
3652            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3653                            "DeviceFound")
3654            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3655                            "GroupStarted")
3656            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3657                            "GroupFinished")
3658            self.add_signal(self.persistentGroupAdded,
3659                            WPAS_DBUS_IFACE_P2PDEVICE,
3660                            "PersistentGroupAdded")
3661            self.add_signal(self.persistentGroupRemoved,
3662                            WPAS_DBUS_IFACE_P2PDEVICE,
3663                            "PersistentGroupRemoved")
3664            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
3665                            WPAS_DBUS_IFACE_P2PDEVICE,
3666                            "ProvisionDiscoveryRequestDisplayPin")
3667            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3668                            "StaAuthorized")
3669            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
3670                            "StaDeauthorized")
3671            self.loop.run()
3672            return self
3673
3674        def groupStarted(self, properties):
3675            logger.debug("groupStarted: " + str(properties))
3676            self.group = properties['group_object']
3677            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3678                                           properties['interface_object'])
3679            role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
3680                                     dbus_interface=dbus.PROPERTIES_IFACE)
3681            if role != "GO":
3682                self.exceptions = True
3683                raise Exception("Unexpected role reported: " + role)
3684            group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
3685                                      dbus_interface=dbus.PROPERTIES_IFACE)
3686            if group != properties['group_object']:
3687                self.exceptions = True
3688                raise Exception("Unexpected Group reported: " + str(group))
3689            go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
3690                                   dbus_interface=dbus.PROPERTIES_IFACE)
3691            if go != '/':
3692                self.exceptions = True
3693                raise Exception("Unexpected PeerGO value: " + str(go))
3694            if self.first:
3695                self.first = False
3696                logger.info("Remove persistent group instance")
3697                group_p2p = dbus.Interface(self.g_if_obj,
3698                                           WPAS_DBUS_IFACE_P2PDEVICE)
3699                group_p2p.Disconnect()
3700            else:
3701                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3702                dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
3703
3704        def groupFinished(self, properties):
3705            logger.debug("groupFinished: " + str(properties))
3706            if self.waiting_end:
3707                logger.info("Remove persistent group")
3708                p2p.RemovePersistentGroup(self.persistent)
3709            else:
3710                logger.info("Re-start persistent group")
3711                params = dbus.Dictionary({'persistent_group_object':
3712                                          self.persistent,
3713                                          'frequency': 2412})
3714                p2p.GroupAdd(params)
3715
3716        def persistentGroupAdded(self, path, properties):
3717            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
3718            self.persistent = path
3719
3720        def persistentGroupRemoved(self, path):
3721            logger.debug("persistentGroupRemoved: %s" % path)
3722            self.done = True
3723            self.loop.quit()
3724
3725        def deviceFound(self, path):
3726            logger.debug("deviceFound: path=%s" % path)
3727            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3728            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3729                                        dbus_interface=dbus.PROPERTIES_IFACE,
3730                                        byte_arrays=True)
3731            logger.debug('peer properties: ' + str(self.peer))
3732
3733        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
3734            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
3735            self.peer_path = peer_object
3736            peer = binascii.unhexlify(peer_object.split('/')[-1])
3737            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3738
3739            params = {'Role': 'registrar',
3740                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3741                      'Bssid': self.peer['DeviceAddress'],
3742                      'Type': 'pin'}
3743            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3744            try:
3745                wps.Start(params)
3746                self.exceptions = True
3747                raise Exception("Invalid WPS.Start() accepted")
3748            except dbus.exceptions.DBusException as e:
3749                if "InvalidArgs" not in str(e):
3750                    self.exceptions = True
3751                    raise Exception("Unexpected error message: " + str(e))
3752            params = {'Role': 'registrar',
3753                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3754                      'Type': 'pin',
3755                      'Pin': '12345670'}
3756            logger.info("Authorize peer to connect to the group")
3757            wps.Start(params)
3758
3759        def staAuthorized(self, name):
3760            logger.debug("staAuthorized: " + name)
3761            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path)
3762            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3763                                  dbus_interface=dbus.PROPERTIES_IFACE,
3764                                  byte_arrays=True)
3765            logger.debug("Peer properties: " + str(res))
3766            if 'Groups' not in res or len(res['Groups']) != 1:
3767                self.exceptions = True
3768                raise Exception("Unexpected number of peer Groups entries")
3769            if res['Groups'][0] != self.group:
3770                self.exceptions = True
3771                raise Exception("Unexpected peer Groups[0] value")
3772
3773            g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group)
3774            res = g_obj.GetAll(WPAS_DBUS_GROUP,
3775                               dbus_interface=dbus.PROPERTIES_IFACE,
3776                               byte_arrays=True)
3777            logger.debug("Group properties: " + str(res))
3778            if 'Members' not in res or len(res['Members']) != 1:
3779                self.exceptions = True
3780                raise Exception("Unexpected number of group members")
3781
3782            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
3783            # Earlier implementation of this interface was a bit strange. The
3784            # property is defined to have aay signature and that is what the
3785            # getter returned. However, the setter expected there to be a
3786            # dictionary with 'WPSVendorExtensions' as the key surrounding these
3787            # values.. The current implementations maintains support for that
3788            # for backwards compability reasons. Verify that encoding first.
3789            vals = dbus.Dictionary({'WPSVendorExtensions': [ext]},
3790                                   signature='sv')
3791            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3792                      dbus_interface=dbus.PROPERTIES_IFACE)
3793            res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3794                               dbus_interface=dbus.PROPERTIES_IFACE,
3795                               byte_arrays=True)
3796            if len(res) != 1:
3797                self.exceptions = True
3798                raise Exception("Unexpected number of vendor extensions")
3799            if res[0] != ext:
3800                self.exceptions = True
3801                raise Exception("Vendor extension value changed")
3802
3803            # And now verify that the more appropriate encoding is accepted as
3804            # well.
3805            res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff'))
3806            g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3807                      dbus_interface=dbus.PROPERTIES_IFACE)
3808            res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions',
3809                             dbus_interface=dbus.PROPERTIES_IFACE,
3810                             byte_arrays=True)
3811            if len(res) != 2:
3812                self.exceptions = True
3813                raise Exception("Unexpected number of vendor extensions")
3814            if res[0] != res2[0] or res[1] != res2[1]:
3815                self.exceptions = True
3816                raise Exception("Vendor extension value changed")
3817
3818            for i in range(10):
3819                res.append(dbus.ByteArray(b'\xaa\xbb'))
3820            try:
3821                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
3822                          dbus_interface=dbus.PROPERTIES_IFACE)
3823                self.exceptions = True
3824                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3825            except dbus.exceptions.DBusException as e:
3826                if "Error.Failed" not in str(e):
3827                    self.exceptions = True
3828                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3829
3830            vals = dbus.Dictionary({'Foo': [ext]}, signature='sv')
3831            try:
3832                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3833                          dbus_interface=dbus.PROPERTIES_IFACE)
3834                self.exceptions = True
3835                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3836            except dbus.exceptions.DBusException as e:
3837                if "InvalidArgs" not in str(e):
3838                    self.exceptions = True
3839                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3840
3841            vals = ["foo"]
3842            try:
3843                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3844                          dbus_interface=dbus.PROPERTIES_IFACE)
3845                self.exceptions = True
3846                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3847            except dbus.exceptions.DBusException as e:
3848                if "Error.Failed" not in str(e):
3849                    self.exceptions = True
3850                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3851
3852            vals = [["foo"]]
3853            try:
3854                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals,
3855                          dbus_interface=dbus.PROPERTIES_IFACE)
3856                self.exceptions = True
3857                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
3858            except dbus.exceptions.DBusException as e:
3859                if "Error.Failed" not in str(e):
3860                    self.exceptions = True
3861                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
3862
3863            p2p.RemoveClient({'peer': self.peer_path})
3864
3865            self.waiting_end = True
3866            group_p2p = dbus.Interface(self.g_if_obj,
3867                                       WPAS_DBUS_IFACE_P2PDEVICE)
3868            group_p2p.Disconnect()
3869
3870        def staDeauthorized(self, name):
3871            logger.debug("staDeauthorized: " + name)
3872            self.deauthorized = True
3873
3874        def run_test(self, *args):
3875            logger.debug("run_test")
3876            params = dbus.Dictionary({'persistent': True,
3877                                      'frequency': 2412})
3878            logger.info("Add a persistent group")
3879            p2p.GroupAdd(params)
3880            return False
3881
3882        def success(self):
3883            return self.done and self.deauthorized and not self.exceptions
3884
3885    with TestDbusP2p(bus) as t:
3886        if not t.success():
3887            raise Exception("Expected signals not seen")
3888
3889    dev[1].wait_go_ending_session()
3890
3891def test_dbus_p2p_autogo_pbc(dev, apdev):
3892    """D-Bus P2P autonomous GO and PBC"""
3893    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3894    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3895
3896    addr0 = dev[0].p2p_dev_addr()
3897
3898    class TestDbusP2p(TestDbus):
3899        def __init__(self, bus):
3900            TestDbus.__init__(self, bus)
3901            self.first = True
3902            self.waiting_end = False
3903            self.done = False
3904
3905        def __enter__(self):
3906            gobject.timeout_add(1, self.run_test)
3907            gobject.timeout_add(15000, self.timeout)
3908            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
3909                            "DeviceFound")
3910            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3911                            "GroupStarted")
3912            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3913                            "GroupFinished")
3914            self.add_signal(self.provisionDiscoveryPBCRequest,
3915                            WPAS_DBUS_IFACE_P2PDEVICE,
3916                            "ProvisionDiscoveryPBCRequest")
3917            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3918                            "StaAuthorized")
3919            self.loop.run()
3920            return self
3921
3922        def groupStarted(self, properties):
3923            logger.debug("groupStarted: " + str(properties))
3924            self.group = properties['group_object']
3925            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
3926                                           properties['interface_object'])
3927            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
3928            dev1.global_request("P2P_CONNECT " + addr0 + " pbc join")
3929
3930        def groupFinished(self, properties):
3931            logger.debug("groupFinished: " + str(properties))
3932            self.done = True
3933            self.loop.quit()
3934
3935        def deviceFound(self, path):
3936            logger.debug("deviceFound: path=%s" % path)
3937            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
3938            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
3939                                        dbus_interface=dbus.PROPERTIES_IFACE,
3940                                        byte_arrays=True)
3941            logger.debug('peer properties: ' + str(self.peer))
3942
3943        def provisionDiscoveryPBCRequest(self, peer_object):
3944            logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object)
3945            self.peer_path = peer_object
3946            peer = binascii.unhexlify(peer_object.split('/')[-1])
3947            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
3948            params = {'Role': 'registrar',
3949                      'P2PDeviceAddress': self.peer['DeviceAddress'],
3950                      'Type': 'pbc'}
3951            logger.info("Authorize peer to connect to the group")
3952            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
3953            wps.Start(params)
3954
3955        def staAuthorized(self, name):
3956            logger.debug("staAuthorized: " + name)
3957            group_p2p = dbus.Interface(self.g_if_obj,
3958                                       WPAS_DBUS_IFACE_P2PDEVICE)
3959            group_p2p.Disconnect()
3960
3961        def run_test(self, *args):
3962            logger.debug("run_test")
3963            params = dbus.Dictionary({'frequency': 2412})
3964            p2p.GroupAdd(params)
3965            return False
3966
3967        def success(self):
3968            return self.done
3969
3970    with TestDbusP2p(bus) as t:
3971        if not t.success():
3972            raise Exception("Expected signals not seen")
3973
3974    dev[1].wait_go_ending_session()
3975    dev[1].flush_scan_cache()
3976
3977def test_dbus_p2p_autogo_legacy(dev, apdev):
3978    """D-Bus P2P autonomous GO and legacy STA"""
3979    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
3980    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
3981
3982    addr0 = dev[0].p2p_dev_addr()
3983
3984    class TestDbusP2p(TestDbus):
3985        def __init__(self, bus):
3986            TestDbus.__init__(self, bus)
3987            self.done = False
3988
3989        def __enter__(self):
3990            gobject.timeout_add(1, self.run_test)
3991            gobject.timeout_add(15000, self.timeout)
3992            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
3993                            "GroupStarted")
3994            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
3995                            "GroupFinished")
3996            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
3997                            "StaAuthorized")
3998            self.loop.run()
3999            return self
4000
4001        def groupStarted(self, properties):
4002            logger.debug("groupStarted: " + str(properties))
4003            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4004                                   properties['group_object'])
4005            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4006                               dbus_interface=dbus.PROPERTIES_IFACE,
4007                               byte_arrays=True)
4008            bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4009
4010            pin = '12345670'
4011            params = {'Role': 'enrollee',
4012                      'Type': 'pin',
4013                      'Pin': pin}
4014            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4015                                      properties['interface_object'])
4016            wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS)
4017            wps.Start(params)
4018            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4019            dev1.scan_for_bss(bssid, freq=2412)
4020            dev1.request("WPS_PIN " + bssid + " " + pin)
4021            self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4022
4023        def groupFinished(self, properties):
4024            logger.debug("groupFinished: " + str(properties))
4025            self.done = True
4026            self.loop.quit()
4027
4028        def staAuthorized(self, name):
4029            logger.debug("staAuthorized: " + name)
4030            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4031            dev1.request("DISCONNECT")
4032            self.group_p2p.Disconnect()
4033
4034        def run_test(self, *args):
4035            logger.debug("run_test")
4036            params = dbus.Dictionary({'frequency': 2412})
4037            p2p.GroupAdd(params)
4038            return False
4039
4040        def success(self):
4041            return self.done
4042
4043    with TestDbusP2p(bus) as t:
4044        if not t.success():
4045            raise Exception("Expected signals not seen")
4046
4047def test_dbus_p2p_join(dev, apdev):
4048    """D-Bus P2P join an autonomous GO"""
4049    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4050    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4051
4052    addr1 = dev[1].p2p_dev_addr()
4053    addr2 = dev[2].p2p_dev_addr()
4054    dev[1].p2p_start_go(freq=2412)
4055    dev1_group_ifname = dev[1].group_ifname
4056    dev[2].p2p_listen()
4057
4058    class TestDbusP2p(TestDbus):
4059        def __init__(self, bus):
4060            TestDbus.__init__(self, bus)
4061            self.done = False
4062            self.peer = None
4063            self.go = None
4064
4065        def __enter__(self):
4066            gobject.timeout_add(1, self.run_test)
4067            gobject.timeout_add(15000, self.timeout)
4068            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4069                            "DeviceFound")
4070            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4071                            "GroupStarted")
4072            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4073                            "GroupFinished")
4074            self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE,
4075                            "InvitationResult")
4076            self.loop.run()
4077            return self
4078
4079        def deviceFound(self, path):
4080            logger.debug("deviceFound: path=%s" % path)
4081            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4082            res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4083                                  dbus_interface=dbus.PROPERTIES_IFACE,
4084                                  byte_arrays=True)
4085            logger.debug('peer properties: ' + str(res))
4086            if addr2.replace(':', '') in path:
4087                self.peer = path
4088            elif addr1.replace(':', '') in path:
4089                self.go = path
4090            if self.peer and self.go:
4091                logger.info("Join the group")
4092                p2p.StopFind()
4093                args = {'peer': self.go,
4094                        'join': True,
4095                        'wps_method': 'pin',
4096                        'frequency': 2412}
4097                pin = p2p.Connect(args)
4098
4099                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4100                dev1.group_ifname = dev1_group_ifname
4101                dev1.group_request("WPS_PIN any " + pin)
4102
4103        def groupStarted(self, properties):
4104            logger.debug("groupStarted: " + str(properties))
4105            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4106                                      properties['interface_object'])
4107            role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role",
4108                                dbus_interface=dbus.PROPERTIES_IFACE)
4109            if role != "client":
4110                raise Exception("Unexpected role reported: " + role)
4111            group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group",
4112                                 dbus_interface=dbus.PROPERTIES_IFACE)
4113            if group != properties['group_object']:
4114                raise Exception("Unexpected Group reported: " + str(group))
4115            go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO",
4116                              dbus_interface=dbus.PROPERTIES_IFACE)
4117            if go != self.go:
4118                raise Exception("Unexpected PeerGO value: " + str(go))
4119
4120            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4121                                   properties['group_object'])
4122            res = g_obj.GetAll(WPAS_DBUS_GROUP,
4123                               dbus_interface=dbus.PROPERTIES_IFACE,
4124                               byte_arrays=True)
4125            logger.debug("Group properties: " + str(res))
4126
4127            ext = dbus.ByteArray(b"\x11\x22\x33\x44")
4128            try:
4129                # Set(WPSVendorExtensions) not allowed for P2P Client
4130                g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res,
4131                          dbus_interface=dbus.PROPERTIES_IFACE)
4132                raise Exception("Invalid Set(WPSVendorExtensions) accepted")
4133            except dbus.exceptions.DBusException as e:
4134                if "Error.Failed: Failed to set property" not in str(e):
4135                    raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e))
4136
4137            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4138            args = {'duration1': 30000, 'interval1': 102400,
4139                    'duration2': 20000, 'interval2': 102400}
4140            group_p2p.PresenceRequest(args)
4141
4142            args = {'peer': self.peer}
4143            group_p2p.Invite(args)
4144
4145        def groupFinished(self, properties):
4146            logger.debug("groupFinished: " + str(properties))
4147            self.done = True
4148            self.loop.quit()
4149
4150        def invitationResult(self, result):
4151            logger.debug("invitationResult: " + str(result))
4152            if result['status'] != 1:
4153                raise Exception("Unexpected invitation result: " + str(result))
4154            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4155            dev1.group_ifname = dev1_group_ifname
4156            dev1.remove_group()
4157
4158        def run_test(self, *args):
4159            logger.debug("run_test")
4160            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4161            return False
4162
4163        def success(self):
4164            return self.done
4165
4166    with TestDbusP2p(bus) as t:
4167        if not t.success():
4168            raise Exception("Expected signals not seen")
4169
4170    dev[2].p2p_stop_find()
4171
4172def test_dbus_p2p_invitation_received(dev, apdev):
4173    """D-Bus P2P and InvitationReceived"""
4174    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4175    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4176
4177    form(dev[0], dev[1])
4178    addr0 = dev[0].p2p_dev_addr()
4179    dev[0].p2p_listen()
4180    dev[0].global_request("SET persistent_reconnect 0")
4181
4182    if not dev[1].discover_peer(addr0, social=True):
4183        raise Exception("Peer " + addr0 + " not found")
4184    peer = dev[1].get_peer(addr0)
4185
4186    class TestDbusP2p(TestDbus):
4187        def __init__(self, bus):
4188            TestDbus.__init__(self, bus)
4189            self.done = False
4190
4191        def __enter__(self):
4192            gobject.timeout_add(1, self.run_test)
4193            gobject.timeout_add(15000, self.timeout)
4194            self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE,
4195                            "InvitationReceived")
4196            self.loop.run()
4197            return self
4198
4199        def invitationReceived(self, result):
4200            logger.debug("invitationReceived: " + str(result))
4201            self.done = True
4202            self.loop.quit()
4203
4204        def run_test(self, *args):
4205            logger.debug("run_test")
4206            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4207            cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0
4208            dev1.global_request(cmd)
4209            return False
4210
4211        def success(self):
4212            return self.done
4213
4214    with TestDbusP2p(bus) as t:
4215        if not t.success():
4216            raise Exception("Expected signals not seen")
4217
4218    dev[0].p2p_stop_find()
4219    dev[1].p2p_stop_find()
4220
4221def test_dbus_p2p_config(dev, apdev):
4222    """D-Bus Get/Set P2PDeviceConfig"""
4223    try:
4224        _test_dbus_p2p_config(dev, apdev)
4225    finally:
4226        dev[0].request("P2P_SET ssid_postfix ")
4227
4228def _test_dbus_p2p_config(dev, apdev):
4229    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4230    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4231
4232    res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4233                     dbus_interface=dbus.PROPERTIES_IFACE,
4234                     byte_arrays=True)
4235    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res,
4236               dbus_interface=dbus.PROPERTIES_IFACE)
4237    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4238                      dbus_interface=dbus.PROPERTIES_IFACE,
4239                      byte_arrays=True)
4240
4241    if len(res) != len(res2):
4242        raise Exception("Different number of parameters")
4243    for k in res:
4244        if res[k] != res2[k]:
4245            raise Exception("Parameter %s value changes" % k)
4246
4247    changes = {'SsidPostfix': 'foo',
4248               'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')],
4249               'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]}
4250    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4251               dbus.Dictionary(changes, signature='sv'),
4252               dbus_interface=dbus.PROPERTIES_IFACE)
4253
4254    res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4255                      dbus_interface=dbus.PROPERTIES_IFACE,
4256                      byte_arrays=True)
4257    logger.debug("P2PDeviceConfig: " + str(res2))
4258    if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1:
4259        raise Exception("VendorExtension does not match")
4260    if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1:
4261        raise Exception("SecondaryDeviceType does not match")
4262
4263    changes = {'SsidPostfix': '',
4264               'VendorExtension': dbus.Array([], signature="ay"),
4265               'SecondaryDeviceTypes': dbus.Array([], signature="ay")}
4266    if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4267               dbus.Dictionary(changes, signature='sv'),
4268               dbus_interface=dbus.PROPERTIES_IFACE)
4269
4270    res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4271                      dbus_interface=dbus.PROPERTIES_IFACE,
4272                      byte_arrays=True)
4273    logger.debug("P2PDeviceConfig: " + str(res3))
4274    if 'VendorExtension' in res3:
4275        raise Exception("VendorExtension not removed")
4276    if 'SecondaryDeviceTypes' in res3:
4277        raise Exception("SecondaryDeviceType not removed")
4278
4279    try:
4280        dev[0].request("P2P_SET disabled 1")
4281        if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4282                   dbus_interface=dbus.PROPERTIES_IFACE,
4283                   byte_arrays=True)
4284        raise Exception("Invalid Get(P2PDeviceConfig) accepted")
4285    except dbus.exceptions.DBusException as e:
4286        if "Error.Failed: P2P is not available for this interface" not in str(e):
4287            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4288    finally:
4289        dev[0].request("P2P_SET disabled 0")
4290
4291    try:
4292        dev[0].request("P2P_SET disabled 1")
4293        changes = {'SsidPostfix': 'foo'}
4294        if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4295                   dbus.Dictionary(changes, signature='sv'),
4296                   dbus_interface=dbus.PROPERTIES_IFACE)
4297        raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4298    except dbus.exceptions.DBusException as e:
4299        if "Error.Failed: P2P is not available for this interface" not in str(e):
4300            raise Exception("Unexpected error message for invalid Invite: " + str(e))
4301    finally:
4302        dev[0].request("P2P_SET disabled 0")
4303
4304    tests = [{'DeviceName': 123},
4305             {'SsidPostfix': 123},
4306             {'Foo': 'Bar'}]
4307    for changes in tests:
4308        try:
4309            if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig",
4310                       dbus.Dictionary(changes, signature='sv'),
4311                       dbus_interface=dbus.PROPERTIES_IFACE)
4312            raise Exception("Invalid Set(P2PDeviceConfig) accepted")
4313        except dbus.exceptions.DBusException as e:
4314            if "InvalidArgs" not in str(e):
4315                raise Exception("Unexpected error message for invalid Invite: " + str(e))
4316
4317def test_dbus_p2p_persistent(dev, apdev):
4318    """D-Bus P2P persistent group"""
4319    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4320    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4321
4322    class TestDbusP2p(TestDbus):
4323        def __init__(self, bus):
4324            TestDbus.__init__(self, bus)
4325
4326        def __enter__(self):
4327            gobject.timeout_add(1, self.run_test)
4328            gobject.timeout_add(15000, self.timeout)
4329            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4330                            "GroupStarted")
4331            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4332                            "GroupFinished")
4333            self.add_signal(self.persistentGroupAdded,
4334                            WPAS_DBUS_IFACE_P2PDEVICE,
4335                            "PersistentGroupAdded")
4336            self.loop.run()
4337            return self
4338
4339        def groupStarted(self, properties):
4340            logger.debug("groupStarted: " + str(properties))
4341            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4342                                      properties['interface_object'])
4343            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4344            group_p2p.Disconnect()
4345
4346        def groupFinished(self, properties):
4347            logger.debug("groupFinished: " + str(properties))
4348            self.loop.quit()
4349
4350        def persistentGroupAdded(self, path, properties):
4351            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4352            self.persistent = path
4353
4354        def run_test(self, *args):
4355            logger.debug("run_test")
4356            params = dbus.Dictionary({'persistent': True,
4357                                      'frequency': 2412})
4358            logger.info("Add a persistent group")
4359            p2p.GroupAdd(params)
4360            return False
4361
4362        def success(self):
4363            return True
4364
4365    with TestDbusP2p(bus) as t:
4366        if not t.success():
4367            raise Exception("Expected signals not seen")
4368        persistent = t.persistent
4369
4370    p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent)
4371    res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4372                    dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True)
4373    logger.info("Persistent group Properties: " + str(res))
4374    vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv')
4375    p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals,
4376              dbus_interface=dbus.PROPERTIES_IFACE)
4377    res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties",
4378                     dbus_interface=dbus.PROPERTIES_IFACE)
4379    if len(res) != len(res2):
4380        raise Exception("Different number of parameters")
4381    for k in res:
4382        if k != 'ssid' and res[k] != res2[k]:
4383            raise Exception("Parameter %s value changes" % k)
4384    if res2['ssid'] != '"DIRECT-foo"':
4385        raise Exception("Unexpected ssid")
4386
4387    args = dbus.Dictionary({'ssid': 'DIRECT-testing',
4388                            'psk': '1234567890'}, signature='sv')
4389    group = p2p.AddPersistentGroup(args)
4390
4391    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4392                        dbus_interface=dbus.PROPERTIES_IFACE)
4393    if len(groups) != 2:
4394        raise Exception("Unexpected number of persistent groups: " + str(groups))
4395
4396    p2p.RemoveAllPersistentGroups()
4397
4398    groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups",
4399                        dbus_interface=dbus.PROPERTIES_IFACE)
4400    if len(groups) != 0:
4401        raise Exception("Unexpected number of persistent groups: " + str(groups))
4402
4403    try:
4404        p2p.RemovePersistentGroup(persistent)
4405        raise Exception("Invalid RemovePersistentGroup accepted")
4406    except dbus.exceptions.DBusException as e:
4407        if "NetworkUnknown: There is no such persistent group" not in str(e):
4408            raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e))
4409
4410def test_dbus_p2p_reinvoke_persistent(dev, apdev):
4411    """D-Bus P2P reinvoke persistent group"""
4412    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4413    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4414
4415    addr0 = dev[0].p2p_dev_addr()
4416
4417    class TestDbusP2p(TestDbus):
4418        def __init__(self, bus):
4419            TestDbus.__init__(self, bus)
4420            self.first = True
4421            self.waiting_end = False
4422            self.done = False
4423            self.invited = False
4424
4425        def __enter__(self):
4426            gobject.timeout_add(1, self.run_test)
4427            gobject.timeout_add(15000, self.timeout)
4428            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4429                            "DeviceFound")
4430            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4431                            "GroupStarted")
4432            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4433                            "GroupFinished")
4434            self.add_signal(self.persistentGroupAdded,
4435                            WPAS_DBUS_IFACE_P2PDEVICE,
4436                            "PersistentGroupAdded")
4437            self.add_signal(self.provisionDiscoveryRequestDisplayPin,
4438                            WPAS_DBUS_IFACE_P2PDEVICE,
4439                            "ProvisionDiscoveryRequestDisplayPin")
4440            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
4441                            "StaAuthorized")
4442            self.loop.run()
4443            return self
4444
4445        def groupStarted(self, properties):
4446            logger.debug("groupStarted: " + str(properties))
4447            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4448                                           properties['interface_object'])
4449            if not self.invited:
4450                g_obj = bus.get_object(WPAS_DBUS_SERVICE,
4451                                       properties['group_object'])
4452                res = g_obj.GetAll(WPAS_DBUS_GROUP,
4453                                   dbus_interface=dbus.PROPERTIES_IFACE,
4454                                   byte_arrays=True)
4455                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])])
4456                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4457                dev1.scan_for_bss(bssid, freq=2412)
4458                dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join")
4459
4460        def groupFinished(self, properties):
4461            logger.debug("groupFinished: " + str(properties))
4462            if self.invited:
4463                self.done = True
4464                self.loop.quit()
4465            else:
4466                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4467                dev1.global_request("SET persistent_reconnect 1")
4468                dev1.p2p_listen()
4469
4470                args = {'persistent_group_object': dbus.ObjectPath(path),
4471                        'peer': self.peer_path}
4472                try:
4473                    pin = p2p.Invite(args)
4474                    raise Exception("Invalid Invite accepted")
4475                except dbus.exceptions.DBusException as e:
4476                    if "InvalidArgs" not in str(e):
4477                        raise Exception("Unexpected error message for invalid Invite: " + str(e))
4478
4479                args = {'persistent_group_object': self.persistent,
4480                        'peer': self.peer_path}
4481                pin = p2p.Invite(args)
4482                self.invited = True
4483
4484                self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4485                                                           timeout=15)
4486                if self.sta_group_ev is None:
4487                    raise Exception("P2P-GROUP-STARTED event not seen")
4488
4489        def persistentGroupAdded(self, path, properties):
4490            logger.debug("persistentGroupAdded: %s %s" % (path, str(properties)))
4491            self.persistent = path
4492
4493        def deviceFound(self, path):
4494            logger.debug("deviceFound: path=%s" % path)
4495            peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path)
4496            self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER,
4497                                        dbus_interface=dbus.PROPERTIES_IFACE,
4498                                        byte_arrays=True)
4499
4500        def provisionDiscoveryRequestDisplayPin(self, peer_object, pin):
4501            logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin))
4502            self.peer_path = peer_object
4503            peer = binascii.unhexlify(peer_object.split('/')[-1])
4504            addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)])
4505            params = {'Role': 'registrar',
4506                      'P2PDeviceAddress': self.peer['DeviceAddress'],
4507                      'Bssid': self.peer['DeviceAddress'],
4508                      'Type': 'pin',
4509                      'Pin': '12345670'}
4510            logger.info("Authorize peer to connect to the group")
4511            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4512            wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS)
4513            wps.Start(params)
4514            self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"],
4515                                                       timeout=15)
4516            if self.sta_group_ev is None:
4517                raise Exception("P2P-GROUP-STARTED event not seen")
4518
4519        def staAuthorized(self, name):
4520            logger.debug("staAuthorized: " + name)
4521            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4522            dev1.group_form_result(self.sta_group_ev)
4523            dev1.remove_group()
4524            ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10)
4525            if ev is None:
4526                raise Exception("Group removal timed out")
4527            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4528            group_p2p.Disconnect()
4529
4530        def run_test(self, *args):
4531            logger.debug("run_test")
4532            params = dbus.Dictionary({'persistent': True,
4533                                      'frequency': 2412})
4534            logger.info("Add a persistent group")
4535            p2p.GroupAdd(params)
4536            return False
4537
4538        def success(self):
4539            return self.done
4540
4541    with TestDbusP2p(bus) as t:
4542        if not t.success():
4543            raise Exception("Expected signals not seen")
4544
4545def test_dbus_p2p_go_neg_rx(dev, apdev):
4546    """D-Bus P2P GO Negotiation receive"""
4547    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4548    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4549    addr0 = dev[0].p2p_dev_addr()
4550
4551    class TestDbusP2p(TestDbus):
4552        def __init__(self, bus):
4553            TestDbus.__init__(self, bus)
4554            self.done = False
4555
4556        def __enter__(self):
4557            gobject.timeout_add(1, self.run_test)
4558            gobject.timeout_add(15000, self.timeout)
4559            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4560                            "DeviceFound")
4561            self.add_signal(self.goNegotiationRequest,
4562                            WPAS_DBUS_IFACE_P2PDEVICE,
4563                            "GONegotiationRequest",
4564                            byte_arrays=True)
4565            self.add_signal(self.goNegotiationSuccess,
4566                            WPAS_DBUS_IFACE_P2PDEVICE,
4567                            "GONegotiationSuccess",
4568                            byte_arrays=True)
4569            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4570                            "GroupStarted")
4571            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4572                            "GroupFinished")
4573            self.loop.run()
4574            return self
4575
4576        def deviceFound(self, path):
4577            logger.debug("deviceFound: path=%s" % path)
4578
4579        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
4580            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
4581            if dev_passwd_id != 1:
4582                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
4583            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4584                    'go_intent': 15, 'persistent': False, 'frequency': 5175}
4585            try:
4586                p2p.Connect(args)
4587                raise Exception("Invalid Connect accepted")
4588            except dbus.exceptions.DBusException as e:
4589                if "ConnectChannelUnsupported" not in str(e):
4590                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4591
4592            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
4593                    'go_intent': 15, 'persistent': False}
4594            p2p.Connect(args)
4595
4596        def goNegotiationSuccess(self, properties):
4597            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4598
4599        def groupStarted(self, properties):
4600            logger.debug("groupStarted: " + str(properties))
4601            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4602                                      properties['interface_object'])
4603            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4604            group_p2p.Disconnect()
4605
4606        def groupFinished(self, properties):
4607            logger.debug("groupFinished: " + str(properties))
4608            self.done = True
4609            self.loop.quit()
4610
4611        def run_test(self, *args):
4612            logger.debug("run_test")
4613            p2p.Listen(10)
4614            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4615            if not dev1.discover_peer(addr0):
4616                raise Exception("Peer not found")
4617            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter")
4618            return False
4619
4620        def success(self):
4621            return self.done
4622
4623    with TestDbusP2p(bus) as t:
4624        if not t.success():
4625            raise Exception("Expected signals not seen")
4626
4627def test_dbus_p2p_go_neg_auth(dev, apdev):
4628    """D-Bus P2P GO Negotiation authorized"""
4629    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4630    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4631    addr0 = dev[0].p2p_dev_addr()
4632    dev[1].p2p_listen()
4633
4634    class TestDbusP2p(TestDbus):
4635        def __init__(self, bus):
4636            TestDbus.__init__(self, bus)
4637            self.done = False
4638            self.peer_joined = False
4639            self.peer_disconnected = False
4640
4641        def __enter__(self):
4642            gobject.timeout_add(1, self.run_test)
4643            gobject.timeout_add(15000, self.timeout)
4644            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4645                            "DeviceFound")
4646            self.add_signal(self.goNegotiationSuccess,
4647                            WPAS_DBUS_IFACE_P2PDEVICE,
4648                            "GONegotiationSuccess",
4649                            byte_arrays=True)
4650            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4651                            "GroupStarted")
4652            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4653                            "GroupFinished")
4654            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
4655                            "StaDeauthorized")
4656            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
4657                            "PeerJoined")
4658            self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP,
4659                            "PeerDisconnected")
4660            self.loop.run()
4661            return self
4662
4663        def deviceFound(self, path):
4664            logger.debug("deviceFound: path=%s" % path)
4665            args = {'peer': path, 'wps_method': 'keypad',
4666                    'go_intent': 15, 'authorize_only': True}
4667            try:
4668                p2p.Connect(args)
4669                raise Exception("Invalid Connect accepted")
4670            except dbus.exceptions.DBusException as e:
4671                if "InvalidArgs" not in str(e):
4672                    raise Exception("Unexpected error message for invalid Connect: " + str(e))
4673
4674            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4675                    'go_intent': 15, 'authorize_only': True}
4676            p2p.Connect(args)
4677            p2p.Listen(10)
4678            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4679            if not dev1.discover_peer(addr0):
4680                raise Exception("Peer not found")
4681            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0")
4682            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4683            if ev is None:
4684                raise Exception("Group formation timed out")
4685            self.sta_group_ev = ev
4686
4687        def goNegotiationSuccess(self, properties):
4688            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4689
4690        def groupStarted(self, properties):
4691            logger.debug("groupStarted: " + str(properties))
4692            self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4693                                           properties['interface_object'])
4694            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4695            dev1.group_form_result(self.sta_group_ev)
4696            dev1.remove_group()
4697
4698        def staDeauthorized(self, name):
4699            logger.debug("staDeuthorized: " + name)
4700            group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4701            group_p2p.Disconnect()
4702
4703        def peerJoined(self, peer):
4704            logger.debug("peerJoined: " + peer)
4705            self.peer_joined = True
4706
4707        def peerDisconnected(self, peer):
4708            logger.debug("peerDisconnected: " + peer)
4709            self.peer_disconnected = True
4710
4711        def groupFinished(self, properties):
4712            logger.debug("groupFinished: " + str(properties))
4713            self.done = True
4714            self.loop.quit()
4715
4716        def run_test(self, *args):
4717            logger.debug("run_test")
4718            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4719            return False
4720
4721        def success(self):
4722            return self.done and self.peer_joined and self.peer_disconnected
4723
4724    with TestDbusP2p(bus) as t:
4725        if not t.success():
4726            raise Exception("Expected signals not seen")
4727
4728def test_dbus_p2p_go_neg_init(dev, apdev):
4729    """D-Bus P2P GO Negotiation initiation"""
4730    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4731    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4732    addr0 = dev[0].p2p_dev_addr()
4733    dev[1].p2p_listen()
4734
4735    class TestDbusP2p(TestDbus):
4736        def __init__(self, bus):
4737            TestDbus.__init__(self, bus)
4738            self.done = False
4739            self.peer_group_added = False
4740            self.peer_group_removed = False
4741
4742        def __enter__(self):
4743            gobject.timeout_add(1, self.run_test)
4744            gobject.timeout_add(15000, self.timeout)
4745            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4746                            "DeviceFound")
4747            self.add_signal(self.goNegotiationSuccess,
4748                            WPAS_DBUS_IFACE_P2PDEVICE,
4749                            "GONegotiationSuccess",
4750                            byte_arrays=True)
4751            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4752                            "GroupStarted")
4753            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4754                            "GroupFinished")
4755            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4756                            "PropertiesChanged")
4757            self.loop.run()
4758            return self
4759
4760        def deviceFound(self, path):
4761            logger.debug("deviceFound: path=%s" % path)
4762            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4763            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4764                    'go_intent': 0}
4765            p2p.Connect(args)
4766
4767            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4768            if ev is None:
4769                raise Exception("Timeout while waiting for GO Neg Request")
4770            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4771            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4772            if ev is None:
4773                raise Exception("Group formation timed out")
4774            self.sta_group_ev = ev
4775            dev1.close_monitor_global()
4776            dev1.close_monitor_mon()
4777            dev1 = None
4778
4779        def goNegotiationSuccess(self, properties):
4780            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4781
4782        def groupStarted(self, properties):
4783            logger.debug("groupStarted: " + str(properties))
4784            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4785                                      properties['interface_object'])
4786            group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4787            group_p2p.Disconnect()
4788            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4789            dev1.group_form_result(self.sta_group_ev)
4790            dev1.remove_group()
4791            dev1 = None
4792
4793        def groupFinished(self, properties):
4794            logger.debug("groupFinished: " + str(properties))
4795            self.done = True
4796
4797        def propertiesChanged(self, interface_name, changed_properties,
4798                              invalidated_properties):
4799            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4800            if interface_name != WPAS_DBUS_P2P_PEER:
4801                return
4802            if "Groups" not in changed_properties:
4803                return
4804            if len(changed_properties["Groups"]) > 0:
4805                self.peer_group_added = True
4806            if len(changed_properties["Groups"]) == 0:
4807                if not self.peer_group_added:
4808                    # This is likely a leftover event from an earlier test case,
4809                    # ignore it to allow this test case to go through its steps.
4810                    logger.info("Ignore propertiesChanged indicating group removal before group has been added")
4811                    return
4812                self.peer_group_removed = True
4813                self.loop.quit()
4814
4815        def run_test(self, *args):
4816            logger.debug("run_test")
4817            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4818            return False
4819
4820        def success(self):
4821            return self.done and self.peer_group_added and self.peer_group_removed
4822
4823    with TestDbusP2p(bus) as t:
4824        if not t.success():
4825            raise Exception("Expected signals not seen")
4826
4827def test_dbus_p2p_group_termination_by_go(dev, apdev):
4828    """D-Bus P2P group removal on GO terminating the group"""
4829    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4830    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4831    addr0 = dev[0].p2p_dev_addr()
4832    dev[1].p2p_listen()
4833
4834    class TestDbusP2p(TestDbus):
4835        def __init__(self, bus):
4836            TestDbus.__init__(self, bus)
4837            self.done = False
4838            self.peer_group_added = False
4839            self.peer_group_removed = False
4840
4841        def __enter__(self):
4842            gobject.timeout_add(1, self.run_test)
4843            gobject.timeout_add(15000, self.timeout)
4844            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4845                            "DeviceFound")
4846            self.add_signal(self.goNegotiationSuccess,
4847                            WPAS_DBUS_IFACE_P2PDEVICE,
4848                            "GONegotiationSuccess",
4849                            byte_arrays=True)
4850            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4851                            "GroupStarted")
4852            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4853                            "GroupFinished")
4854            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4855                            "PropertiesChanged")
4856            self.loop.run()
4857            return self
4858
4859        def deviceFound(self, path):
4860            logger.debug("deviceFound: path=%s" % path)
4861            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4862            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4863                    'go_intent': 0}
4864            p2p.Connect(args)
4865
4866            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4867            if ev is None:
4868                raise Exception("Timeout while waiting for GO Neg Request")
4869            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4870            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4871            if ev is None:
4872                raise Exception("Group formation timed out")
4873            self.sta_group_ev = ev
4874            dev1.close_monitor_global()
4875            dev1.close_monitor_mon()
4876            dev1 = None
4877
4878        def goNegotiationSuccess(self, properties):
4879            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4880
4881        def groupStarted(self, properties):
4882            logger.debug("groupStarted: " + str(properties))
4883            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4884                                      properties['interface_object'])
4885            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4886            dev1.group_form_result(self.sta_group_ev)
4887            dev1.remove_group()
4888
4889        def groupFinished(self, properties):
4890            logger.debug("groupFinished: " + str(properties))
4891            self.done = True
4892
4893        def propertiesChanged(self, interface_name, changed_properties,
4894                              invalidated_properties):
4895            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
4896            if interface_name != WPAS_DBUS_P2P_PEER:
4897                return
4898            if "Groups" not in changed_properties:
4899                return
4900            if len(changed_properties["Groups"]) > 0:
4901                self.peer_group_added = True
4902            if len(changed_properties["Groups"]) == 0 and self.peer_group_added:
4903                self.peer_group_removed = True
4904                self.loop.quit()
4905
4906        def run_test(self, *args):
4907            logger.debug("run_test")
4908            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
4909            return False
4910
4911        def success(self):
4912            return self.done and self.peer_group_added and self.peer_group_removed
4913
4914    with TestDbusP2p(bus) as t:
4915        if not t.success():
4916            raise Exception("Expected signals not seen")
4917
4918def test_dbus_p2p_group_idle_timeout(dev, apdev):
4919    """D-Bus P2P group removal on idle timeout"""
4920    try:
4921        dev[0].global_request("SET p2p_group_idle 1")
4922        _test_dbus_p2p_group_idle_timeout(dev, apdev)
4923    finally:
4924        dev[0].global_request("SET p2p_group_idle 0")
4925
4926def _test_dbus_p2p_group_idle_timeout(dev, apdev):
4927    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
4928    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
4929    addr0 = dev[0].p2p_dev_addr()
4930    dev[1].p2p_listen()
4931
4932    class TestDbusP2p(TestDbus):
4933        def __init__(self, bus):
4934            TestDbus.__init__(self, bus)
4935            self.done = False
4936            self.group_started = False
4937            self.peer_group_added = False
4938            self.peer_group_removed = False
4939
4940        def __enter__(self):
4941            gobject.timeout_add(1, self.run_test)
4942            gobject.timeout_add(15000, self.timeout)
4943            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
4944                            "DeviceFound")
4945            self.add_signal(self.goNegotiationSuccess,
4946                            WPAS_DBUS_IFACE_P2PDEVICE,
4947                            "GONegotiationSuccess",
4948                            byte_arrays=True)
4949            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
4950                            "GroupStarted")
4951            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
4952                            "GroupFinished")
4953            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
4954                            "PropertiesChanged")
4955            self.loop.run()
4956            return self
4957
4958        def deviceFound(self, path):
4959            logger.debug("deviceFound: path=%s" % path)
4960            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
4961            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
4962                    'go_intent': 0}
4963            p2p.Connect(args)
4964
4965            ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15)
4966            if ev is None:
4967                raise Exception("Timeout while waiting for GO Neg Request")
4968            dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15")
4969            ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
4970            if ev is None:
4971                raise Exception("Group formation timed out")
4972            self.sta_group_ev = ev
4973            dev1.close_monitor_global()
4974            dev1.close_monitor_mon()
4975            dev1 = None
4976
4977        def goNegotiationSuccess(self, properties):
4978            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
4979
4980        def groupStarted(self, properties):
4981            logger.debug("groupStarted: " + str(properties))
4982            self.group_started = True
4983            g_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
4984                                      properties['interface_object'])
4985            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False)
4986            dev1.group_form_result(self.sta_group_ev)
4987            ifaddr = dev1.group_request("STA-FIRST").splitlines()[0]
4988            # Force disassociation with different reason code so that the
4989            # P2P Client using D-Bus does not get normal group termination event
4990            # from the GO.
4991            dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0")
4992            dev1.remove_group()
4993
4994        def groupFinished(self, properties):
4995            logger.debug("groupFinished: " + str(properties))
4996            self.done = True
4997
4998        def propertiesChanged(self, interface_name, changed_properties,
4999                              invalidated_properties):
5000            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5001            if interface_name != WPAS_DBUS_P2P_PEER:
5002                return
5003            if not self.group_started:
5004                return
5005            if "Groups" not in changed_properties:
5006                return
5007            if len(changed_properties["Groups"]) > 0:
5008                self.peer_group_added = True
5009            if len(changed_properties["Groups"]) == 0:
5010                self.peer_group_removed = True
5011                self.loop.quit()
5012
5013        def run_test(self, *args):
5014            logger.debug("run_test")
5015            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5016            return False
5017
5018        def success(self):
5019            return self.done and self.peer_group_added and self.peer_group_removed
5020
5021    with TestDbusP2p(bus) as t:
5022        if not t.success():
5023            raise Exception("Expected signals not seen")
5024
5025def test_dbus_p2p_wps_failure(dev, apdev):
5026    """D-Bus P2P WPS failure"""
5027    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5028    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5029    addr0 = dev[0].p2p_dev_addr()
5030
5031    class TestDbusP2p(TestDbus):
5032        def __init__(self, bus):
5033            TestDbus.__init__(self, bus)
5034            self.wps_failed = False
5035            self.formation_failure = False
5036
5037        def __enter__(self):
5038            gobject.timeout_add(1, self.run_test)
5039            gobject.timeout_add(15000, self.timeout)
5040            self.add_signal(self.goNegotiationRequest,
5041                            WPAS_DBUS_IFACE_P2PDEVICE,
5042                            "GONegotiationRequest",
5043                            byte_arrays=True)
5044            self.add_signal(self.goNegotiationSuccess,
5045                            WPAS_DBUS_IFACE_P2PDEVICE,
5046                            "GONegotiationSuccess",
5047                            byte_arrays=True)
5048            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5049                            "GroupStarted")
5050            self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE,
5051                            "WpsFailed")
5052            self.add_signal(self.groupFormationFailure,
5053                            WPAS_DBUS_IFACE_P2PDEVICE,
5054                            "GroupFormationFailure")
5055            self.loop.run()
5056            return self
5057
5058        def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0):
5059            logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent))
5060            if dev_passwd_id != 1:
5061                raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id)
5062            args = {'peer': path, 'wps_method': 'display', 'pin': '12345670',
5063                    'go_intent': 15}
5064            p2p.Connect(args)
5065
5066        def goNegotiationSuccess(self, properties):
5067            logger.debug("goNegotiationSuccess: properties=%s" % str(properties))
5068
5069        def groupStarted(self, properties):
5070            logger.debug("groupStarted: " + str(properties))
5071            raise Exception("Unexpected GroupStarted")
5072
5073        def wpsFailed(self, name, args):
5074            logger.debug("wpsFailed - name=%s args=%s" % (name, str(args)))
5075            self.wps_failed = True
5076            if self.formation_failure:
5077                self.loop.quit()
5078
5079        def groupFormationFailure(self, reason):
5080            logger.debug("groupFormationFailure - reason=%s" % reason)
5081            self.formation_failure = True
5082            if self.wps_failed:
5083                self.loop.quit()
5084
5085        def run_test(self, *args):
5086            logger.debug("run_test")
5087            p2p.Listen(10)
5088            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5089            if not dev1.discover_peer(addr0):
5090                raise Exception("Peer not found")
5091            dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter")
5092            return False
5093
5094        def success(self):
5095            return self.wps_failed and self.formation_failure
5096
5097    with TestDbusP2p(bus) as t:
5098        if not t.success():
5099            raise Exception("Expected signals not seen")
5100
5101def test_dbus_p2p_two_groups(dev, apdev):
5102    """D-Bus P2P with two concurrent groups"""
5103    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5104    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5105
5106    dev[0].request("SET p2p_no_group_iface 0")
5107    addr0 = dev[0].p2p_dev_addr()
5108    addr1 = dev[1].p2p_dev_addr()
5109    addr2 = dev[2].p2p_dev_addr()
5110    dev[1].p2p_start_go(freq=2412)
5111    dev1_group_ifname = dev[1].group_ifname
5112
5113    class TestDbusP2p(TestDbus):
5114        def __init__(self, bus):
5115            TestDbus.__init__(self, bus)
5116            self.done = False
5117            self.peer = None
5118            self.go = None
5119            self.group1 = None
5120            self.group2 = None
5121            self.groups_removed = False
5122
5123        def __enter__(self):
5124            gobject.timeout_add(1, self.run_test)
5125            gobject.timeout_add(15000, self.timeout)
5126            self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE,
5127                            "PropertiesChanged", byte_arrays=True)
5128            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5129                            "DeviceFound")
5130            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5131                            "GroupStarted")
5132            self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE,
5133                            "GroupFinished")
5134            self.add_signal(self.peerJoined, WPAS_DBUS_GROUP,
5135                            "PeerJoined")
5136            self.loop.run()
5137            return self
5138
5139        def propertiesChanged(self, interface_name, changed_properties,
5140                              invalidated_properties):
5141            logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties)))
5142
5143        def deviceFound(self, path):
5144            logger.debug("deviceFound: path=%s" % path)
5145            if addr2.replace(':', '') in path:
5146                self.peer = path
5147            elif addr1.replace(':', '') in path:
5148                self.go = path
5149            if self.go and not self.group1:
5150                logger.info("Join the group")
5151                p2p.StopFind()
5152                pin = '12345670'
5153                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5154                dev1.group_ifname = dev1_group_ifname
5155                dev1.group_request("WPS_PIN any " + pin)
5156                args = {'peer': self.go,
5157                        'join': True,
5158                        'wps_method': 'pin',
5159                        'pin': pin,
5160                        'frequency': 2412}
5161                p2p.Connect(args)
5162
5163        def groupStarted(self, properties):
5164            logger.debug("groupStarted: " + str(properties))
5165            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5166                                 dbus_interface=dbus.PROPERTIES_IFACE)
5167            logger.debug("p2pdevice properties: " + str(prop))
5168
5169            g_obj = bus.get_object(WPAS_DBUS_SERVICE,
5170                                   properties['group_object'])
5171            res = g_obj.GetAll(WPAS_DBUS_GROUP,
5172                               dbus_interface=dbus.PROPERTIES_IFACE,
5173                               byte_arrays=True)
5174            logger.debug("Group properties: " + str(res))
5175
5176            if not self.group1:
5177                self.group1 = properties['group_object']
5178                self.group1iface = properties['interface_object']
5179                self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5180                                                self.group1iface)
5181
5182                logger.info("Start autonomous GO")
5183                params = dbus.Dictionary({'frequency': 2412})
5184                p2p.GroupAdd(params)
5185            elif not self.group2:
5186                self.group2 = properties['group_object']
5187                self.group2iface = properties['interface_object']
5188                self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE,
5189                                                self.group2iface)
5190                self.g2_bssid = res['BSSID']
5191
5192            if self.group1 and self.group2:
5193                logger.info("Authorize peer to join the group")
5194                a2 = binascii.unhexlify(addr2.replace(':', ''))
5195                params = {'Role': 'enrollee',
5196                          'P2PDeviceAddress': dbus.ByteArray(a2),
5197                          'Bssid': dbus.ByteArray(a2),
5198                          'Type': 'pin',
5199                          'Pin': '12345670'}
5200                g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS)
5201                g_wps.Start(params)
5202
5203                bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)])
5204                dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5205                dev2.scan_for_bss(bssid, freq=2412)
5206                dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412")
5207                ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15)
5208                if ev is None:
5209                    raise Exception("Group join timed out")
5210                self.dev2_group_ev = ev
5211
5212        def groupFinished(self, properties):
5213            logger.debug("groupFinished: " + str(properties))
5214
5215            if self.group1 == properties['group_object']:
5216                self.group1 = None
5217            elif self.group2 == properties['group_object']:
5218                self.group2 = None
5219
5220            if not self.group1 and not self.group2:
5221                self.done = True
5222                self.loop.quit()
5223
5224        def peerJoined(self, peer):
5225            logger.debug("peerJoined: " + peer)
5226            if self.groups_removed:
5227                return
5228            self.check_results()
5229
5230            dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2')
5231            dev2.group_form_result(self.dev2_group_ev)
5232            dev2.remove_group()
5233
5234            logger.info("Disconnect group2")
5235            group_p2p = dbus.Interface(self.g2_if_obj,
5236                                       WPAS_DBUS_IFACE_P2PDEVICE)
5237            group_p2p.Disconnect()
5238
5239            logger.info("Disconnect group1")
5240            group_p2p = dbus.Interface(self.g1_if_obj,
5241                                       WPAS_DBUS_IFACE_P2PDEVICE)
5242            group_p2p.Disconnect()
5243            self.groups_removed = True
5244
5245        def check_results(self):
5246            logger.info("Check results with two concurrent groups in operation")
5247
5248            g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1)
5249            res1 = g1_obj.GetAll(WPAS_DBUS_GROUP,
5250                                 dbus_interface=dbus.PROPERTIES_IFACE,
5251                                 byte_arrays=True)
5252
5253            g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2)
5254            res2 = g2_obj.GetAll(WPAS_DBUS_GROUP,
5255                                 dbus_interface=dbus.PROPERTIES_IFACE,
5256                                 byte_arrays=True)
5257
5258            logger.info("group1 = " + self.group1)
5259            logger.debug("Group properties: " + str(res1))
5260
5261            logger.info("group2 = " + self.group2)
5262            logger.debug("Group properties: " + str(res2))
5263
5264            prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE,
5265                                 dbus_interface=dbus.PROPERTIES_IFACE)
5266            logger.debug("p2pdevice properties: " + str(prop))
5267
5268            if res1['Role'] != 'client':
5269                raise Exception("Group1 role reported incorrectly: " + res1['Role'])
5270            if res2['Role'] != 'GO':
5271                raise Exception("Group2 role reported incorrectly: " + res2['Role'])
5272            if prop['Role'] != 'device':
5273                raise Exception("p2pdevice role reported incorrectly: " + prop['Role'])
5274
5275            if len(res2['Members']) != 1:
5276                   raise Exception("Unexpected Members value for group 2")
5277
5278        def run_test(self, *args):
5279            logger.debug("run_test")
5280            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5281            return False
5282
5283        def success(self):
5284            return self.done
5285
5286    with TestDbusP2p(bus) as t:
5287        if not t.success():
5288            raise Exception("Expected signals not seen")
5289
5290    dev[1].remove_group()
5291
5292def test_dbus_p2p_cancel(dev, apdev):
5293    """D-Bus P2P Cancel"""
5294    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5295    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5296    try:
5297        p2p.Cancel()
5298        raise Exception("Unexpected p2p.Cancel() success")
5299    except dbus.exceptions.DBusException as e:
5300        pass
5301
5302    addr0 = dev[0].p2p_dev_addr()
5303    dev[1].p2p_listen()
5304
5305    class TestDbusP2p(TestDbus):
5306        def __init__(self, bus):
5307            TestDbus.__init__(self, bus)
5308            self.done = False
5309
5310        def __enter__(self):
5311            gobject.timeout_add(1, self.run_test)
5312            gobject.timeout_add(15000, self.timeout)
5313            self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE,
5314                            "DeviceFound")
5315            self.loop.run()
5316            return self
5317
5318        def deviceFound(self, path):
5319            logger.debug("deviceFound: path=%s" % path)
5320            args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670',
5321                    'go_intent': 0}
5322            p2p.Connect(args)
5323            p2p.Cancel()
5324            self.done = True
5325            self.loop.quit()
5326
5327        def run_test(self, *args):
5328            logger.debug("run_test")
5329            p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'}))
5330            return False
5331
5332        def success(self):
5333            return self.done
5334
5335    with TestDbusP2p(bus) as t:
5336        if not t.success():
5337            raise Exception("Expected signals not seen")
5338
5339def test_dbus_p2p_ip_addr(dev, apdev):
5340    """D-Bus P2P and IP address parameters"""
5341    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5342    p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE)
5343
5344    vals = [("IpAddrGo", "192.168.43.1"),
5345            ("IpAddrMask", "255.255.255.0"),
5346            ("IpAddrStart", "192.168.43.100"),
5347            ("IpAddrEnd", "192.168.43.199")]
5348    for field, value in vals:
5349        if_obj.Set(WPAS_DBUS_IFACE, field, value,
5350                   dbus_interface=dbus.PROPERTIES_IFACE)
5351        val = if_obj.Get(WPAS_DBUS_IFACE, field,
5352                         dbus_interface=dbus.PROPERTIES_IFACE)
5353        if val != value:
5354            raise Exception("Unexpected %s value: %s" % (field, val))
5355
5356    set_ip_addr_info(dev[1])
5357
5358    dev[0].global_request("SET p2p_go_intent 0")
5359
5360    req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip()
5361    if "FAIL" in req:
5362        raise Exception("Failed to generate NFC connection handover request")
5363    sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip()
5364    if "FAIL" in sel:
5365        raise Exception("Failed to generate NFC connection handover select")
5366    dev[0].dump_monitor()
5367    dev[1].dump_monitor()
5368    res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel)
5369    if "FAIL" in res:
5370        raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)")
5371    res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel)
5372    if "FAIL" in res:
5373        raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)")
5374
5375    class TestDbusP2p(TestDbus):
5376        def __init__(self, bus):
5377            TestDbus.__init__(self, bus)
5378            self.done = False
5379
5380        def __enter__(self):
5381            gobject.timeout_add(1, self.run_test)
5382            gobject.timeout_add(15000, self.timeout)
5383            self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE,
5384                            "GroupStarted")
5385            self.loop.run()
5386            return self
5387
5388        def groupStarted(self, properties):
5389            logger.debug("groupStarted: " + str(properties))
5390            self.loop.quit()
5391
5392            if 'IpAddrGo' not in properties:
5393                logger.info("IpAddrGo missing from GroupStarted")
5394            ip_addr_go = properties['IpAddrGo']
5395            addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3])
5396            if addr != "192.168.42.1":
5397                logger.info("Unexpected IpAddrGo value: " + addr)
5398            self.done = True
5399
5400        def run_test(self, *args):
5401            logger.debug("run_test")
5402            return False
5403
5404        def success(self):
5405            return self.done
5406
5407    with TestDbusP2p(bus) as t:
5408        if not t.success():
5409            raise Exception("Expected signals not seen")
5410
5411def test_dbus_introspect(dev, apdev):
5412    """D-Bus introspection"""
5413    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5414
5415    res = if_obj.Introspect(WPAS_DBUS_IFACE,
5416                            dbus_interface=dbus.INTROSPECTABLE_IFACE)
5417    logger.info("Initial Introspect: " + str(res))
5418    if res is None or "Introspectable" not in res or "GroupStarted" not in res:
5419        raise Exception("Unexpected initial Introspect response: " + str(res))
5420    if "FastReauth" not in res or "PassiveScan" not in res:
5421        raise Exception("Unexpected initial Introspect response: " + str(res))
5422
5423    with alloc_fail(dev[0], 1, "wpa_dbus_introspect"):
5424        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5425                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5426        logger.info("Introspect: " + str(res2))
5427        if res2 is not None:
5428            raise Exception("Unexpected Introspect response")
5429
5430    with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"):
5431        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5432                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5433        logger.info("Introspect: " + str(res2))
5434        if res2 is None:
5435            raise Exception("No Introspect response")
5436        if len(res2) >= len(res):
5437            raise Exception("Unexpected Introspect response")
5438
5439    with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"):
5440        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5441                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5442        logger.info("Introspect: " + str(res2))
5443        if res2 is None:
5444            raise Exception("No Introspect response")
5445        if len(res2) >= len(res):
5446            raise Exception("Unexpected Introspect response")
5447
5448    with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"):
5449        res2 = if_obj.Introspect(WPAS_DBUS_IFACE,
5450                                 dbus_interface=dbus.INTROSPECTABLE_IFACE)
5451        logger.info("Introspect: " + str(res2))
5452        if res2 is None:
5453            raise Exception("No Introspect response")
5454        if len(res2) >= len(res):
5455            raise Exception("Unexpected Introspect response")
5456
5457def run_busctl(service, obj):
5458    if not shutil.which("busctl"):
5459        raise HwsimSkip("No busctl available")
5460    logger.info("busctl introspect %s %s" % (service, obj))
5461    cmd = subprocess.Popen(['busctl', 'introspect', service, obj],
5462                           stdout=subprocess.PIPE,
5463                           stderr=subprocess.PIPE)
5464    out = cmd.communicate()
5465    cmd.wait()
5466    logger.info("busctl stdout:\n%s" % out[0].strip())
5467    if len(out[1]) > 0:
5468        logger.info("busctl stderr: %s" % out[1].decode().strip())
5469    if "Duplicate property" in out[1].decode():
5470        raise Exception("Duplicate property")
5471
5472def test_dbus_introspect_busctl(dev, apdev):
5473    """D-Bus introspection with busctl"""
5474    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5475    ifaces = dbus_get(dbus, wpas_obj, "Interfaces")
5476    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH)
5477    run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces")
5478    run_busctl(WPAS_DBUS_SERVICE, ifaces[0])
5479
5480    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5481    bssid = apdev[0]['bssid']
5482    dev[0].scan_for_bss(bssid, freq=2412)
5483    id = dev[0].add_network()
5484    dev[0].set_network(id, "disabled", "0")
5485    dev[0].set_network_quoted(id, "ssid", "test")
5486
5487    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0")
5488    run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0")
5489
5490def test_dbus_ap(dev, apdev):
5491    """D-Bus AddNetwork for AP mode"""
5492    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5493    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5494
5495    ssid = "test-wpa2-psk"
5496    passphrase = 'qwertyuiop'
5497
5498    class TestDbusConnect(TestDbus):
5499        def __init__(self, bus):
5500            TestDbus.__init__(self, bus)
5501            self.started = False
5502            self.sta_added = False
5503            self.sta_removed = False
5504            self.authorized = False
5505            self.deauthorized = False
5506            self.stations = False
5507
5508        def __enter__(self):
5509            gobject.timeout_add(1, self.run_connect)
5510            gobject.timeout_add(15000, self.timeout)
5511            self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded")
5512            self.add_signal(self.networkSelected, WPAS_DBUS_IFACE,
5513                            "NetworkSelected")
5514            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5515                            "PropertiesChanged")
5516            self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded")
5517            self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE,
5518                            "StationRemoved")
5519            self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE,
5520                            "StaAuthorized")
5521            self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE,
5522                            "StaDeauthorized")
5523            self.loop.run()
5524            return self
5525
5526        def networkAdded(self, network, properties):
5527            logger.debug("networkAdded: %s" % str(network))
5528            logger.debug(str(properties))
5529
5530        def networkSelected(self, network):
5531            logger.debug("networkSelected: %s" % str(network))
5532            self.network_selected = True
5533
5534        def propertiesChanged(self, properties):
5535            logger.debug("propertiesChanged: %s" % str(properties))
5536            if 'State' in properties and properties['State'] == "completed":
5537                self.started = True
5538                dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5539                dev1.connect(ssid, psk=passphrase, scan_freq="2412")
5540
5541        def stationAdded(self, station, properties):
5542            logger.debug("stationAdded: %s" % str(station))
5543            logger.debug(str(properties))
5544            self.sta_added = True
5545            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5546                             dbus_interface=dbus.PROPERTIES_IFACE)
5547            logger.info("Stations: " + str(res))
5548            if len(res) == 1:
5549                self.stations = True
5550            else:
5551                raise Exception("Missing Stations entry: " + str(res))
5552
5553        def stationRemoved(self, station):
5554            logger.debug("stationRemoved: %s" % str(station))
5555            self.sta_removed = True
5556            res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations',
5557                             dbus_interface=dbus.PROPERTIES_IFACE)
5558            logger.info("Stations: " + str(res))
5559            if len(res) != 0:
5560                self.stations = False
5561                raise Exception("Unexpected Stations entry: " + str(res))
5562            self.loop.quit()
5563
5564        def staAuthorized(self, name):
5565            logger.debug("staAuthorized: " + name)
5566            self.authorized = True
5567            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
5568            dev1.request("DISCONNECT")
5569
5570        def staDeauthorized(self, name):
5571            logger.debug("staDeauthorized: " + name)
5572            self.deauthorized = True
5573
5574        def run_connect(self, *args):
5575            logger.debug("run_connect")
5576            args = dbus.Dictionary({'ssid': ssid,
5577                                    'key_mgmt': 'WPA-PSK',
5578                                    'psk': passphrase,
5579                                    'mode': 2,
5580                                    'frequency': 2412,
5581                                    'scan_freq': 2412},
5582                                   signature='sv')
5583            self.netw = iface.AddNetwork(args)
5584            iface.SelectNetwork(self.netw)
5585            return False
5586
5587        def success(self):
5588            return self.started and self.sta_added and self.sta_removed and \
5589                self.authorized and self.deauthorized
5590
5591    with TestDbusConnect(bus) as t:
5592        if not t.success():
5593            raise Exception("Expected signals not seen")
5594
5595def test_dbus_ap_scan(dev, apdev):
5596    """D-Bus AddNetwork for AP mode and scan"""
5597    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5598    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5599
5600    ssid = "test-wpa2-psk"
5601    passphrase = 'qwertyuiop'
5602
5603    hapd = hostapd.add_ap(apdev[0], {"ssid": "open"})
5604    bssid = hapd.own_addr()
5605
5606    class TestDbusConnect(TestDbus):
5607        def __init__(self, bus):
5608            TestDbus.__init__(self, bus)
5609            self.started = False
5610            self.scan_completed = False
5611
5612        def __enter__(self):
5613            gobject.timeout_add(1, self.run_connect)
5614            gobject.timeout_add(15000, self.timeout)
5615            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5616                            "PropertiesChanged")
5617            self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone")
5618            self.loop.run()
5619            return self
5620
5621        def propertiesChanged(self, properties):
5622            logger.debug("propertiesChanged: %s" % str(properties))
5623            if 'State' in properties and properties['State'] == "completed":
5624                self.started = True
5625                logger.info("Try to scan in AP mode")
5626                iface.Scan({'Type': 'active',
5627                            'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5628                logger.info("Scan() returned")
5629
5630        def scanDone(self, success):
5631            logger.debug("scanDone: success=%s" % success)
5632            if self.started:
5633                self.scan_completed = True
5634                self.loop.quit()
5635
5636        def run_connect(self, *args):
5637            logger.debug("run_connect")
5638            args = dbus.Dictionary({'ssid': ssid,
5639                                    'key_mgmt': 'WPA-PSK',
5640                                    'psk': passphrase,
5641                                    'mode': 2,
5642                                    'frequency': 2412,
5643                                    'scan_freq': 2412},
5644                                   signature='sv')
5645            self.netw = iface.AddNetwork(args)
5646            iface.SelectNetwork(self.netw)
5647            return False
5648
5649        def success(self):
5650            return self.started and self.scan_completed
5651
5652    with TestDbusConnect(bus) as t:
5653        if not t.success():
5654            raise Exception("Expected signals not seen")
5655
5656def test_dbus_connect_wpa_eap(dev, apdev):
5657    """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP"""
5658    skip_without_tkip(dev[0])
5659    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5660    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5661
5662    ssid = "test-wpa-eap"
5663    params = hostapd.wpa_eap_params(ssid=ssid)
5664    params["wpa"] = "3"
5665    params["rsn_pairwise"] = "CCMP"
5666    hapd = hostapd.add_ap(apdev[0], params)
5667
5668    class TestDbusConnect(TestDbus):
5669        def __init__(self, bus):
5670            TestDbus.__init__(self, bus)
5671            self.done = False
5672
5673        def __enter__(self):
5674            gobject.timeout_add(1, self.run_connect)
5675            gobject.timeout_add(15000, self.timeout)
5676            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5677                            "PropertiesChanged")
5678            self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP")
5679            self.loop.run()
5680            return self
5681
5682        def propertiesChanged(self, properties):
5683            logger.debug("propertiesChanged: %s" % str(properties))
5684            if 'State' in properties and properties['State'] == "completed":
5685                self.done = True
5686                self.loop.quit()
5687
5688        def eap(self, status, parameter):
5689            logger.debug("EAP: status=%s parameter=%s" % (status, parameter))
5690
5691        def run_connect(self, *args):
5692            logger.debug("run_connect")
5693            args = dbus.Dictionary({'ssid': ssid,
5694                                    'key_mgmt': 'WPA-EAP',
5695                                    'eap': 'PEAP',
5696                                    'identity': 'user',
5697                                    'password': 'password',
5698                                    'ca_cert': 'auth_serv/ca.pem',
5699                                    'phase2': 'auth=MSCHAPV2',
5700                                    'scan_freq': 2412},
5701                                   signature='sv')
5702            self.netw = iface.AddNetwork(args)
5703            iface.SelectNetwork(self.netw)
5704            return False
5705
5706        def success(self):
5707            return self.done
5708
5709    with TestDbusConnect(bus) as t:
5710        if not t.success():
5711            raise Exception("Expected signals not seen")
5712
5713def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5714    """AP_SCAN 2 AP mode and D-Bus Scan()"""
5715    try:
5716        _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev)
5717    finally:
5718        dev[0].request("AP_SCAN 1")
5719
5720def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev):
5721    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5722    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5723
5724    if "OK" not in dev[0].request("AP_SCAN 2"):
5725        raise Exception("Failed to set AP_SCAN 2")
5726
5727    id = dev[0].add_network()
5728    dev[0].set_network(id, "mode", "2")
5729    dev[0].set_network_quoted(id, "ssid", "wpas-ap-open")
5730    dev[0].set_network(id, "key_mgmt", "NONE")
5731    dev[0].set_network(id, "frequency", "2412")
5732    dev[0].set_network(id, "scan_freq", "2412")
5733    dev[0].set_network(id, "disabled", "0")
5734    dev[0].select_network(id)
5735    ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5)
5736    if ev is None:
5737        raise Exception("AP failed to start")
5738
5739    with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"):
5740        iface.Scan({'Type': 'active',
5741                    'AllowRoam': True,
5742                    'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]})
5743        ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5744                                "AP-DISABLED"], timeout=5)
5745        if ev is None:
5746            raise Exception("CTRL-EVENT-SCAN-FAILED not seen")
5747        if "AP-DISABLED" in ev:
5748            raise Exception("Unexpected AP-DISABLED event")
5749        if "retry=1" in ev:
5750            # Wait for the retry to scan happen
5751            ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED",
5752                                    "AP-DISABLED"], timeout=5)
5753            if ev is None:
5754                raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry")
5755            if "AP-DISABLED" in ev:
5756                raise Exception("Unexpected AP-DISABLED event - retry")
5757
5758    dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412")
5759    dev[1].request("DISCONNECT")
5760    dev[1].wait_disconnected()
5761    dev[0].request("DISCONNECT")
5762    dev[0].wait_disconnected()
5763
5764def test_dbus_expectdisconnect(dev, apdev):
5765    """D-Bus ExpectDisconnect"""
5766    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5767    wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE)
5768
5769    params = {"ssid": "test-open"}
5770    hapd = hostapd.add_ap(apdev[0], params)
5771    dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412")
5772
5773    # This does not really verify the behavior other than by going through the
5774    # code path for additional coverage.
5775    wpas.ExpectDisconnect()
5776    dev[0].request("DISCONNECT")
5777    dev[0].wait_disconnected()
5778
5779def test_dbus_save_config(dev, apdev):
5780    """D-Bus SaveConfig"""
5781    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5782    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5783    try:
5784        iface.SaveConfig()
5785        raise Exception("SaveConfig() accepted unexpectedly")
5786    except dbus.exceptions.DBusException as e:
5787        if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"):
5788            raise Exception("Unexpected error message for SaveConfig(): " + str(e))
5789
5790def test_dbus_vendor_elem(dev, apdev):
5791    """D-Bus vendor element operations"""
5792    try:
5793        _test_dbus_vendor_elem(dev, apdev)
5794    finally:
5795        dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5796
5797def _test_dbus_vendor_elem(dev, apdev):
5798    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5799    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5800
5801    dev[0].request("VENDOR_ELEM_REMOVE 1 *")
5802
5803    try:
5804        ie = dbus.ByteArray(b"\x00\x00")
5805        iface.VendorElemAdd(-1, ie)
5806        raise Exception("Invalid VendorElemAdd() accepted")
5807    except dbus.exceptions.DBusException as e:
5808        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5809            raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e))
5810
5811    try:
5812        ie = dbus.ByteArray(b'')
5813        iface.VendorElemAdd(1, ie)
5814        raise Exception("Invalid VendorElemAdd() accepted")
5815    except dbus.exceptions.DBusException as e:
5816        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5817            raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e))
5818
5819    try:
5820        ie = dbus.ByteArray(b"\x00\x01")
5821        iface.VendorElemAdd(1, ie)
5822        raise Exception("Invalid VendorElemAdd() accepted")
5823    except dbus.exceptions.DBusException as e:
5824        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5825            raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e))
5826
5827    try:
5828        iface.VendorElemGet(-1)
5829        raise Exception("Invalid VendorElemGet() accepted")
5830    except dbus.exceptions.DBusException as e:
5831        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5832            raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e))
5833
5834    try:
5835        iface.VendorElemGet(1)
5836        raise Exception("Invalid VendorElemGet() accepted")
5837    except dbus.exceptions.DBusException as e:
5838        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5839            raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e))
5840
5841    try:
5842        ie = dbus.ByteArray(b"\x00\x00")
5843        iface.VendorElemRem(-1, ie)
5844        raise Exception("Invalid VendorElemRemove() accepted")
5845    except dbus.exceptions.DBusException as e:
5846        if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e):
5847            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5848
5849    try:
5850        ie = dbus.ByteArray(b'')
5851        iface.VendorElemRem(1, ie)
5852        raise Exception("Invalid VendorElemRemove() accepted")
5853    except dbus.exceptions.DBusException as e:
5854        if "InvalidArgs" not in str(e) or "Invalid value" not in str(e):
5855            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5856
5857    iface.VendorElemRem(1, b"*")
5858
5859    ie = dbus.ByteArray(b"\x00\x01\x00")
5860    iface.VendorElemAdd(1, ie)
5861
5862    val = iface.VendorElemGet(1)
5863    if len(val) != len(ie):
5864        raise Exception("Unexpected VendorElemGet length")
5865    for i in range(len(val)):
5866        if val[i] != dbus.Byte(ie[i]):
5867            raise Exception("Unexpected VendorElemGet data")
5868
5869    ie2 = dbus.ByteArray(b"\xe0\x00")
5870    iface.VendorElemAdd(1, ie2)
5871
5872    ies = ie + ie2
5873    val = iface.VendorElemGet(1)
5874    if len(val) != len(ies):
5875        raise Exception("Unexpected VendorElemGet length[2]")
5876    for i in range(len(val)):
5877        if val[i] != dbus.Byte(ies[i]):
5878            raise Exception("Unexpected VendorElemGet data[2]")
5879
5880    try:
5881        test_ie = dbus.ByteArray(b"\x01\x01")
5882        iface.VendorElemRem(1, test_ie)
5883        raise Exception("Invalid VendorElemRemove() accepted")
5884    except dbus.exceptions.DBusException as e:
5885        if "InvalidArgs" not in str(e) or "Parse error" not in str(e):
5886            raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e))
5887
5888    iface.VendorElemRem(1, ie)
5889    val = iface.VendorElemGet(1)
5890    if len(val) != len(ie2):
5891        raise Exception("Unexpected VendorElemGet length[3]")
5892
5893    iface.VendorElemRem(1, b"*")
5894    try:
5895        iface.VendorElemGet(1)
5896        raise Exception("Invalid VendorElemGet() accepted after removal")
5897    except dbus.exceptions.DBusException as e:
5898        if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e):
5899            raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e))
5900
5901def test_dbus_assoc_reject(dev, apdev):
5902    """D-Bus AssocStatusCode"""
5903    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5904    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
5905
5906    ssid = "test-open"
5907    params = {"ssid": ssid,
5908              "max_listen_interval": "1"}
5909    hapd = hostapd.add_ap(apdev[0], params)
5910
5911    class TestDbusConnect(TestDbus):
5912        def __init__(self, bus):
5913            TestDbus.__init__(self, bus)
5914            self.assoc_status_seen = False
5915            self.state = 0
5916
5917        def __enter__(self):
5918            gobject.timeout_add(1, self.run_connect)
5919            gobject.timeout_add(15000, self.timeout)
5920            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
5921                            "PropertiesChanged")
5922            self.loop.run()
5923            return self
5924
5925        def propertiesChanged(self, properties):
5926            logger.debug("propertiesChanged: %s" % str(properties))
5927            if 'AssocStatusCode' in properties:
5928                status = properties['AssocStatusCode']
5929                if status != 51:
5930                    logger.info("Unexpected status code: " + str(status))
5931                else:
5932                    self.assoc_status_seen = True
5933                iface.Disconnect()
5934                self.loop.quit()
5935
5936        def run_connect(self, *args):
5937            args = dbus.Dictionary({'ssid': ssid,
5938                                    'key_mgmt': 'NONE',
5939                                    'scan_freq': 2412},
5940                                   signature='sv')
5941            self.netw = iface.AddNetwork(args)
5942            iface.SelectNetwork(self.netw)
5943            return False
5944
5945        def success(self):
5946            return self.assoc_status_seen
5947
5948    with TestDbusConnect(bus) as t:
5949        if not t.success():
5950            raise Exception("Expected signals not seen")
5951
5952def test_dbus_mesh(dev, apdev):
5953    """D-Bus mesh"""
5954    check_mesh_support(dev[0])
5955    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
5956    mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH)
5957
5958    add_open_mesh_network(dev[1])
5959    addr1 = dev[1].own_addr()
5960
5961    class TestDbusMesh(TestDbus):
5962        def __init__(self, bus):
5963            TestDbus.__init__(self, bus)
5964            self.done = False
5965
5966        def __enter__(self):
5967            gobject.timeout_add(1, self.run_test)
5968            gobject.timeout_add(15000, self.timeout)
5969            self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH,
5970                            "MeshGroupStarted")
5971            self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH,
5972                            "MeshGroupRemoved")
5973            self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH,
5974                            "MeshPeerConnected")
5975            self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH,
5976                            "MeshPeerDisconnected")
5977            self.loop.run()
5978            return self
5979
5980        def meshGroupStarted(self, args):
5981            logger.debug("MeshGroupStarted: " + str(args))
5982
5983        def meshGroupRemoved(self, args):
5984            logger.debug("MeshGroupRemoved: " + str(args))
5985            self.done = True
5986            self.loop.quit()
5987
5988        def meshPeerConnected(self, args):
5989            logger.debug("MeshPeerConnected: " + str(args))
5990
5991            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers',
5992                             dbus_interface=dbus.PROPERTIES_IFACE,
5993                             byte_arrays=True)
5994            logger.debug("MeshPeers: " + str(res))
5995            if len(res) != 1:
5996                raise Exception("Unexpected number of MeshPeer values")
5997            if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''):
5998                raise Exception("Unexpected peer address")
5999
6000            res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup',
6001                             dbus_interface=dbus.PROPERTIES_IFACE,
6002                             byte_arrays=True)
6003            logger.debug("MeshGroup: " + str(res))
6004            if res != b"wpas-mesh-open":
6005                raise Exception("Unexpected MeshGroup")
6006            dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1')
6007            dev1.mesh_group_remove()
6008
6009        def meshPeerDisconnected(self, args):
6010            logger.debug("MeshPeerDisconnected: " + str(args))
6011            dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
6012            dev0.mesh_group_remove()
6013
6014        def run_test(self, *args):
6015            logger.debug("run_test")
6016            dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0')
6017            add_open_mesh_network(dev0)
6018            return False
6019
6020        def success(self):
6021            return self.done
6022
6023    with TestDbusMesh(bus) as t:
6024        if not t.success():
6025            raise Exception("Expected signals not seen")
6026
6027def test_dbus_roam(dev, apdev):
6028    """D-Bus Roam"""
6029    (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0])
6030    iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE)
6031
6032    ssid = "test-wpa2-psk"
6033    passphrase = 'qwertyuiop'
6034    params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase)
6035    hapd = hostapd.add_ap(apdev[0], params)
6036    hapd2 = hostapd.add_ap(apdev[1], params)
6037    bssid = apdev[0]['bssid']
6038    dev[0].scan_for_bss(bssid, freq=2412)
6039    bssid2 = apdev[1]['bssid']
6040    dev[0].scan_for_bss(bssid2, freq=2412)
6041
6042    class TestDbusConnect(TestDbus):
6043        def __init__(self, bus):
6044            TestDbus.__init__(self, bus)
6045            self.state = 0
6046
6047        def __enter__(self):
6048            gobject.timeout_add(1, self.run_connect)
6049            gobject.timeout_add(15000, self.timeout)
6050            self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE,
6051                            "PropertiesChanged")
6052            self.loop.run()
6053            return self
6054
6055        def propertiesChanged(self, properties):
6056            logger.debug("propertiesChanged: %s" % str(properties))
6057            if 'State' in properties and properties['State'] == "completed":
6058                if self.state == 0:
6059                    self.state = 1
6060                    cur = properties["CurrentBSS"]
6061                    bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur)
6062                    res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID',
6063                                      dbus_interface=dbus.PROPERTIES_IFACE)
6064                    bssid_str = ''
6065                    for item in res:
6066                        if len(bssid_str) > 0:
6067                            bssid_str += ':'
6068                        bssid_str += '%02x' % item
6069                    dst = bssid if bssid_str == bssid2 else bssid2
6070                    iface.Roam(dst)
6071                elif self.state == 1:
6072                    if "RoamComplete" in properties and \
6073                       properties["RoamComplete"]:
6074                        self.state = 2
6075                        self.loop.quit()
6076
6077        def run_connect(self, *args):
6078            logger.debug("run_connect")
6079            args = dbus.Dictionary({'ssid': ssid,
6080                                    'key_mgmt': 'WPA-PSK',
6081                                    'psk': passphrase,
6082                                    'scan_freq': 2412},
6083                                   signature='sv')
6084            self.netw = iface.AddNetwork(args)
6085            iface.SelectNetwork(self.netw)
6086            return False
6087
6088        def success(self):
6089            return self.state == 2
6090
6091    with TestDbusConnect(bus) as t:
6092        if not t.success():
6093            raise Exception("Expected signals not seen")
6094