1# wpa_supplicant D-Bus interface tests 2# Copyright (c) 2014-2015, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import binascii 8import logging 9logger = logging.getLogger() 10import subprocess 11import time 12import shutil 13import struct 14import sys 15 16try: 17 if sys.version_info[0] > 2: 18 from gi.repository import GObject as gobject 19 else: 20 import gobject 21 import dbus 22 dbus_imported = True 23except ImportError: 24 dbus_imported = False 25 26import hostapd 27from wpasupplicant import WpaSupplicant 28from utils import * 29from p2p_utils import * 30from test_ap_tdls import connect_2sta_open 31from test_ap_eap import check_altsubject_match_support 32from test_nfc_p2p import set_ip_addr_info 33from test_wpas_mesh import check_mesh_support, add_open_mesh_network 34 35WPAS_DBUS_SERVICE = "fi.w1.wpa_supplicant1" 36WPAS_DBUS_PATH = "/fi/w1/wpa_supplicant1" 37WPAS_DBUS_IFACE = "fi.w1.wpa_supplicant1.Interface" 38WPAS_DBUS_IFACE_WPS = WPAS_DBUS_IFACE + ".WPS" 39WPAS_DBUS_NETWORK = "fi.w1.wpa_supplicant1.Network" 40WPAS_DBUS_BSS = "fi.w1.wpa_supplicant1.BSS" 41WPAS_DBUS_IFACE_P2PDEVICE = WPAS_DBUS_IFACE + ".P2PDevice" 42WPAS_DBUS_P2P_PEER = "fi.w1.wpa_supplicant1.Peer" 43WPAS_DBUS_GROUP = "fi.w1.wpa_supplicant1.Group" 44WPAS_DBUS_PERSISTENT_GROUP = "fi.w1.wpa_supplicant1.PersistentGroup" 45WPAS_DBUS_IFACE_MESH = WPAS_DBUS_IFACE + ".Mesh" 46 47def prepare_dbus(dev): 48 if not dbus_imported: 49 logger.info("No dbus module available") 50 raise HwsimSkip("No dbus module available") 51 try: 52 from dbus.mainloop.glib import DBusGMainLoop 53 dbus.mainloop.glib.DBusGMainLoop(set_as_default=True) 54 bus = dbus.SystemBus() 55 wpas_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 56 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 57 path = wpas.GetInterface(dev.ifname) 58 if_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 59 return (bus, wpas_obj, path, if_obj) 60 except Exception as e: 61 raise HwsimSkip("Could not connect to D-Bus: %s" % e) 62 63class TestDbus(object): 64 def __init__(self, bus): 65 self.loop = gobject.MainLoop() 66 self.signals = [] 67 self.bus = bus 68 69 def __exit__(self, type, value, traceback): 70 for s in self.signals: 71 s.remove() 72 73 def add_signal(self, handler, interface, name, byte_arrays=False): 74 s = self.bus.add_signal_receiver(handler, dbus_interface=interface, 75 signal_name=name, 76 byte_arrays=byte_arrays) 77 self.signals.append(s) 78 79 def timeout(self, *args): 80 logger.debug("timeout") 81 self.loop.quit() 82 return False 83 84class alloc_fail_dbus(object): 85 def __init__(self, dev, count, funcs, operation="Operation", 86 expected="NoMemory"): 87 self._dev = dev 88 self._count = count 89 self._funcs = funcs 90 self._operation = operation 91 self._expected = expected 92 def __enter__(self): 93 cmd = "TEST_ALLOC_FAIL %d:%s" % (self._count, self._funcs) 94 if "OK" not in self._dev.request(cmd): 95 raise HwsimSkip("TEST_ALLOC_FAIL not supported") 96 def __exit__(self, type, value, traceback): 97 if type is None: 98 raise Exception("%s succeeded during out-of-memory" % self._operation) 99 if type == dbus.exceptions.DBusException and self._expected in str(value): 100 return True 101 if self._dev.request("GET_ALLOC_FAIL") != "0:%s" % self._funcs: 102 raise Exception("%s did not trigger allocation failure" % self._operation) 103 return False 104 105def start_ap(ap, ssid="test-wps", 106 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7e"): 107 params = {"ssid": ssid, "eap_server": "1", "wps_state": "2", 108 "wpa_passphrase": "12345678", "wpa": "2", 109 "wpa_key_mgmt": "WPA-PSK", "rsn_pairwise": "CCMP", 110 "ap_pin": "12345670", "uuid": ap_uuid} 111 return hostapd.add_ap(ap, params) 112 113def test_dbus_getall(dev, apdev): 114 """D-Bus GetAll""" 115 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 116 117 props = wpas_obj.GetAll(WPAS_DBUS_SERVICE, 118 dbus_interface=dbus.PROPERTIES_IFACE) 119 logger.debug("GetAll(fi.w1.wpa.supplicant1, /fi/w1/wpa_supplicant1) ==> " + str(props)) 120 121 props = if_obj.GetAll(WPAS_DBUS_IFACE, 122 dbus_interface=dbus.PROPERTIES_IFACE) 123 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE, path, str(props))) 124 125 props = if_obj.GetAll(WPAS_DBUS_IFACE_WPS, 126 dbus_interface=dbus.PROPERTIES_IFACE) 127 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_IFACE_WPS, path, str(props))) 128 129 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 130 dbus_interface=dbus.PROPERTIES_IFACE) 131 if len(res) != 0: 132 raise Exception("Unexpected BSSs entry: " + str(res)) 133 134 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 135 dbus_interface=dbus.PROPERTIES_IFACE) 136 if len(res) != 0: 137 raise Exception("Unexpected Networks entry: " + str(res)) 138 139 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 140 bssid = apdev[0]['bssid'] 141 dev[0].scan_for_bss(bssid, freq=2412) 142 id = dev[0].add_network() 143 dev[0].set_network(id, "disabled", "0") 144 dev[0].set_network_quoted(id, "ssid", "test") 145 146 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 147 dbus_interface=dbus.PROPERTIES_IFACE) 148 if len(res) != 1: 149 raise Exception("Missing BSSs entry: " + str(res)) 150 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 151 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 152 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 153 bssid_str = '' 154 for item in props['BSSID']: 155 if len(bssid_str) > 0: 156 bssid_str += ':' 157 bssid_str += '%02x' % item 158 if bssid_str != bssid: 159 raise Exception("Unexpected BSSID in BSSs entry") 160 161 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 162 dbus_interface=dbus.PROPERTIES_IFACE) 163 if len(res) != 1: 164 raise Exception("Missing Networks entry: " + str(res)) 165 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 166 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 167 dbus_interface=dbus.PROPERTIES_IFACE) 168 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_NETWORK, res[0], str(props))) 169 ssid = props['Properties']['ssid'] 170 if ssid != '"test"': 171 raise Exception("Unexpected SSID in network entry") 172 173def test_dbus_getall_oom(dev, apdev): 174 """D-Bus GetAll wpa_config_get_all() OOM""" 175 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 176 177 id = dev[0].add_network() 178 dev[0].set_network(id, "disabled", "0") 179 dev[0].set_network_quoted(id, "ssid", "test") 180 181 res = if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 182 dbus_interface=dbus.PROPERTIES_IFACE) 183 if len(res) != 1: 184 raise Exception("Missing Networks entry: " + str(res)) 185 net_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 186 for i in range(1, 50): 187 with alloc_fail(dev[0], i, "wpa_config_get_all"): 188 try: 189 props = net_obj.GetAll(WPAS_DBUS_NETWORK, 190 dbus_interface=dbus.PROPERTIES_IFACE) 191 except dbus.exceptions.DBusException as e: 192 pass 193 194def dbus_get(dbus, wpas_obj, prop, expect=None, byte_arrays=False): 195 val = wpas_obj.Get(WPAS_DBUS_SERVICE, prop, 196 dbus_interface=dbus.PROPERTIES_IFACE, 197 byte_arrays=byte_arrays) 198 if expect is not None and val != expect: 199 raise Exception("Unexpected %s: %s (expected: %s)" % 200 (prop, str(val), str(expect))) 201 return val 202 203def dbus_set(dbus, wpas_obj, prop, val): 204 wpas_obj.Set(WPAS_DBUS_SERVICE, prop, val, 205 dbus_interface=dbus.PROPERTIES_IFACE) 206 207def test_dbus_properties(dev, apdev): 208 """D-Bus Get/Set fi.w1.wpa_supplicant1 properties""" 209 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 210 211 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 212 dbus_set(dbus, wpas_obj, "DebugLevel", "debug") 213 dbus_get(dbus, wpas_obj, "DebugLevel", expect="debug") 214 for (val, err) in [(3, "Error.Failed: wrong property type"), 215 ("foo", "Error.Failed: wrong debug level value")]: 216 try: 217 dbus_set(dbus, wpas_obj, "DebugLevel", val) 218 raise Exception("Invalid DebugLevel value accepted: " + str(val)) 219 except dbus.exceptions.DBusException as e: 220 if err not in str(e): 221 raise Exception("Unexpected error message: " + str(e)) 222 dbus_set(dbus, wpas_obj, "DebugLevel", "msgdump") 223 dbus_get(dbus, wpas_obj, "DebugLevel", expect="msgdump") 224 225 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 226 dbus_set(dbus, wpas_obj, "DebugTimestamp", False) 227 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=False) 228 try: 229 dbus_set(dbus, wpas_obj, "DebugTimestamp", "foo") 230 raise Exception("Invalid DebugTimestamp value accepted") 231 except dbus.exceptions.DBusException as e: 232 if "Error.Failed: wrong property type" not in str(e): 233 raise Exception("Unexpected error message: " + str(e)) 234 dbus_set(dbus, wpas_obj, "DebugTimestamp", True) 235 dbus_get(dbus, wpas_obj, "DebugTimestamp", expect=True) 236 237 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 238 dbus_set(dbus, wpas_obj, "DebugShowKeys", False) 239 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=False) 240 try: 241 dbus_set(dbus, wpas_obj, "DebugShowKeys", "foo") 242 raise Exception("Invalid DebugShowKeys value accepted") 243 except dbus.exceptions.DBusException as e: 244 if "Error.Failed: wrong property type" not in str(e): 245 raise Exception("Unexpected error message: " + str(e)) 246 dbus_set(dbus, wpas_obj, "DebugShowKeys", True) 247 dbus_get(dbus, wpas_obj, "DebugShowKeys", expect=True) 248 249 res = dbus_get(dbus, wpas_obj, "Interfaces") 250 if len(res) != 1: 251 raise Exception("Unexpected Interfaces value: " + str(res)) 252 253 res = dbus_get(dbus, wpas_obj, "EapMethods") 254 if len(res) < 5 or "TTLS" not in res: 255 raise Exception("Unexpected EapMethods value: " + str(res)) 256 257 res = dbus_get(dbus, wpas_obj, "Capabilities") 258 if len(res) < 2 or "p2p" not in res: 259 raise Exception("Unexpected Capabilities value: " + str(res)) 260 261 dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 262 val = binascii.unhexlify("010006020304050608") 263 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 264 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 265 if val != res: 266 raise Exception("WFDIEs value changed") 267 try: 268 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'\x00')) 269 raise Exception("Invalid WFDIEs value accepted") 270 except dbus.exceptions.DBusException as e: 271 if "InvalidArgs" not in str(e): 272 raise Exception("Unexpected error message: " + str(e)) 273 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 274 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(val)) 275 dbus_set(dbus, wpas_obj, "WFDIEs", dbus.ByteArray(b'')) 276 res = dbus_get(dbus, wpas_obj, "WFDIEs", byte_arrays=True) 277 if len(res) != 0: 278 raise Exception("WFDIEs not cleared properly") 279 280 res = dbus_get(dbus, wpas_obj, "EapMethods") 281 try: 282 dbus_set(dbus, wpas_obj, "EapMethods", res) 283 raise Exception("Invalid Set accepted") 284 except dbus.exceptions.DBusException as e: 285 if "InvalidArgs: Property is read-only" not in str(e): 286 raise Exception("Unexpected error message: " + str(e)) 287 288 try: 289 wpas_obj.SetFoo(WPAS_DBUS_SERVICE, "DebugShowKeys", True, 290 dbus_interface=dbus.PROPERTIES_IFACE) 291 raise Exception("Unknown method accepted") 292 except dbus.exceptions.DBusException as e: 293 if "UnknownMethod" not in str(e): 294 raise Exception("Unexpected error message: " + str(e)) 295 296 try: 297 wpas_obj.Get("foo", "DebugShowKeys", 298 dbus_interface=dbus.PROPERTIES_IFACE) 299 raise Exception("Invalid Get accepted") 300 except dbus.exceptions.DBusException as e: 301 if "InvalidArgs: No such property" not in str(e): 302 raise Exception("Unexpected error message: " + str(e)) 303 304 test_obj = bus.get_object(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH, 305 introspect=False) 306 try: 307 test_obj.Get(123, "DebugShowKeys", 308 dbus_interface=dbus.PROPERTIES_IFACE) 309 raise Exception("Invalid Get accepted") 310 except dbus.exceptions.DBusException as e: 311 if "InvalidArgs: Invalid arguments" not in str(e): 312 raise Exception("Unexpected error message: " + str(e)) 313 try: 314 test_obj.Get(WPAS_DBUS_SERVICE, 123, 315 dbus_interface=dbus.PROPERTIES_IFACE) 316 raise Exception("Invalid Get accepted") 317 except dbus.exceptions.DBusException as e: 318 if "InvalidArgs: Invalid arguments" not in str(e): 319 raise Exception("Unexpected error message: " + str(e)) 320 321 try: 322 wpas_obj.Set(WPAS_DBUS_SERVICE, "WFDIEs", 323 dbus.ByteArray(b'', variant_level=2), 324 dbus_interface=dbus.PROPERTIES_IFACE) 325 raise Exception("Invalid Set accepted") 326 except dbus.exceptions.DBusException as e: 327 if "InvalidArgs: invalid message format" not in str(e): 328 raise Exception("Unexpected error message: " + str(e)) 329 330def test_dbus_set_global_properties(dev, apdev): 331 """D-Bus Get/Set fi.w1.wpa_supplicant1 interface global properties""" 332 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 333 334 dev[0].set("model_name", "") 335 props = [('Okc', '0', '1'), ('ModelName', '', 'blahblahblah')] 336 337 for p in props: 338 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 339 dbus_interface=dbus.PROPERTIES_IFACE) 340 if res != p[1]: 341 raise Exception("Unexpected " + p[0] + " value: " + str(res)) 342 343 if_obj.Set(WPAS_DBUS_IFACE, p[0], p[2], 344 dbus_interface=dbus.PROPERTIES_IFACE) 345 346 res = if_obj.Get(WPAS_DBUS_IFACE, p[0], 347 dbus_interface=dbus.PROPERTIES_IFACE) 348 if res != p[2]: 349 raise Exception("Unexpected " + p[0] + " value after set: " + str(res)) 350 dev[0].set("model_name", "") 351 352def test_dbus_invalid_method(dev, apdev): 353 """D-Bus invalid method""" 354 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 355 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 356 357 try: 358 wps.Foo() 359 raise Exception("Unknown method accepted") 360 except dbus.exceptions.DBusException as e: 361 if "UnknownMethod" not in str(e): 362 raise Exception("Unexpected error message: " + str(e)) 363 364 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 365 test_wps = dbus.Interface(test_obj, WPAS_DBUS_IFACE_WPS) 366 try: 367 test_wps.Start(123) 368 raise Exception("WPS.Start with incorrect signature accepted") 369 except dbus.exceptions.DBusException as e: 370 if "InvalidArgs: Invalid arg" not in str(e): 371 raise Exception("Unexpected error message: " + str(e)) 372 373def test_dbus_get_set_wps(dev, apdev): 374 """D-Bus Get/Set for WPS properties""" 375 try: 376 _test_dbus_get_set_wps(dev, apdev) 377 finally: 378 dev[0].request("SET wps_cred_processing 0") 379 dev[0].request("SET config_methods display keypad virtual_display nfc_interface p2ps") 380 dev[0].set("device_name", "Device A") 381 dev[0].set("manufacturer", "") 382 dev[0].set("model_name", "") 383 dev[0].set("model_number", "") 384 dev[0].set("serial_number", "") 385 dev[0].set("device_type", "0-00000000-0") 386 387def _test_dbus_get_set_wps(dev, apdev): 388 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 389 390 if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 391 dbus_interface=dbus.PROPERTIES_IFACE) 392 393 val = "display keypad virtual_display nfc_interface" 394 dev[0].request("SET config_methods " + val) 395 396 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 397 dbus_interface=dbus.PROPERTIES_IFACE) 398 if config != val: 399 raise Exception("Unexpected Get(ConfigMethods) result: " + config) 400 401 val2 = "push_button display" 402 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 403 dbus_interface=dbus.PROPERTIES_IFACE) 404 config = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ConfigMethods", 405 dbus_interface=dbus.PROPERTIES_IFACE) 406 if config != val2: 407 raise Exception("Unexpected Get(ConfigMethods) result after Set: " + config) 408 409 dev[0].request("SET config_methods " + val) 410 411 for i in range(3): 412 dev[0].request("SET wps_cred_processing " + str(i)) 413 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 414 dbus_interface=dbus.PROPERTIES_IFACE) 415 expected_val = False if i == 1 else True 416 if val != expected_val: 417 raise Exception("Unexpected Get(ProcessCredentials) result({}): {}".format(i, val)) 418 419 tests = [("device_name", "DeviceName"), 420 ("manufacturer", "Manufacturer"), 421 ("model_name", "ModelName"), 422 ("model_number", "ModelNumber"), 423 ("serial_number", "SerialNumber")] 424 425 for f1, f2 in tests: 426 val2 = "test-value-test" 427 dev[0].set(f1, val2) 428 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 429 dbus_interface=dbus.PROPERTIES_IFACE) 430 if val != val2: 431 raise Exception("Get(%s) returned unexpected value" % f2) 432 val2 = "TEST-value" 433 if_obj.Set(WPAS_DBUS_IFACE_WPS, f2, val2, 434 dbus_interface=dbus.PROPERTIES_IFACE) 435 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, f2, 436 dbus_interface=dbus.PROPERTIES_IFACE) 437 if val != val2: 438 raise Exception("Get(%s) returned unexpected value after Set" % f2) 439 440 dev[0].set("device_type", "5-0050F204-1") 441 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 442 dbus_interface=dbus.PROPERTIES_IFACE) 443 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 444 raise Exception("DeviceType mismatch") 445 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", val, 446 dbus_interface=dbus.PROPERTIES_IFACE) 447 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 448 dbus_interface=dbus.PROPERTIES_IFACE) 449 if val[0] != 0x00 or val[1] != 0x05 != val[2] != 0x00 or val[3] != 0x50 or val[4] != 0xf2 or val[5] != 0x04 or val[6] != 0x00 or val[7] != 0x01: 450 raise Exception("DeviceType mismatch after Set") 451 452 val2 = b'\x01\x02\x03\x04\x05\x06\x07\x08' 453 if_obj.Set(WPAS_DBUS_IFACE_WPS, "DeviceType", dbus.ByteArray(val2), 454 dbus_interface=dbus.PROPERTIES_IFACE) 455 val = if_obj.Get(WPAS_DBUS_IFACE_WPS, "DeviceType", 456 dbus_interface=dbus.PROPERTIES_IFACE, 457 byte_arrays=True) 458 if val != val2: 459 raise Exception("DeviceType mismatch after Set (2)") 460 461 class TestDbusGetSet(TestDbus): 462 def __init__(self, bus): 463 TestDbus.__init__(self, bus) 464 self.signal_received = False 465 self.signal_received_deprecated = False 466 self.sets_done = False 467 468 def __enter__(self): 469 gobject.timeout_add(1, self.run_sets) 470 gobject.timeout_add(1000, self.timeout) 471 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE_WPS, 472 "PropertiesChanged") 473 self.add_signal(self.propertiesChanged2, dbus.PROPERTIES_IFACE, 474 "PropertiesChanged") 475 self.loop.run() 476 return self 477 478 def propertiesChanged(self, properties): 479 logger.debug("PropertiesChanged: " + str(properties)) 480 if "ProcessCredentials" in properties: 481 self.signal_received_deprecated = True 482 if self.sets_done and self.signal_received: 483 self.loop.quit() 484 485 def propertiesChanged2(self, interface_name, changed_properties, 486 invalidated_properties): 487 logger.debug("propertiesChanged2: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 488 if interface_name != WPAS_DBUS_IFACE_WPS: 489 return 490 if "ProcessCredentials" in changed_properties: 491 self.signal_received = True 492 if self.sets_done and self.signal_received_deprecated: 493 self.loop.quit() 494 495 def run_sets(self, *args): 496 logger.debug("run_sets") 497 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 498 dbus.Boolean(1), 499 dbus_interface=dbus.PROPERTIES_IFACE) 500 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 501 dbus_interface=dbus.PROPERTIES_IFACE) != True: 502 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 503 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 504 dbus.Boolean(0), 505 dbus_interface=dbus.PROPERTIES_IFACE) 506 if if_obj.Get(WPAS_DBUS_IFACE_WPS, "ProcessCredentials", 507 dbus_interface=dbus.PROPERTIES_IFACE) != False: 508 raise Exception("Unexpected Get(ProcessCredentials) result after Set") 509 510 self.dbus_sets_done = True 511 return False 512 513 def success(self): 514 return self.signal_received and self.signal_received_deprecated 515 516 with TestDbusGetSet(bus) as t: 517 if not t.success(): 518 raise Exception("No signal received for ProcessCredentials change") 519 520def test_dbus_wps_invalid(dev, apdev): 521 """D-Bus invaldi WPS operation""" 522 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 523 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 524 525 failures = [{'Role': 'foo', 'Type': 'pbc'}, 526 {'Role': 123, 'Type': 'pbc'}, 527 {'Type': 'pbc'}, 528 {'Role': 'enrollee'}, 529 {'Role': 'registrar'}, 530 {'Role': 'enrollee', 'Type': 123}, 531 {'Role': 'enrollee', 'Type': 'foo'}, 532 {'Role': 'enrollee', 'Type': 'pbc', 533 'Bssid': '02:33:44:55:66:77'}, 534 {'Role': 'enrollee', 'Type': 'pin', 'Pin': 123}, 535 {'Role': 'enrollee', 'Type': 'pbc', 536 'Bssid': dbus.ByteArray(b'12345')}, 537 {'Role': 'enrollee', 'Type': 'pbc', 538 'P2PDeviceAddress': 12345}, 539 {'Role': 'enrollee', 'Type': 'pbc', 540 'P2PDeviceAddress': dbus.ByteArray(b'12345')}, 541 {'Role': 'enrollee', 'Type': 'pbc', 'Foo': 'bar'}] 542 for args in failures: 543 try: 544 wps.Start(args) 545 raise Exception("Invalid WPS.Start() arguments accepted: " + str(args)) 546 except dbus.exceptions.DBusException as e: 547 if not str(e).startswith("fi.w1.wpa_supplicant1.InvalidArgs"): 548 raise Exception("Unexpected error message: " + str(e)) 549 550def test_dbus_wps_oom(dev, apdev): 551 """D-Bus WPS operation (OOM)""" 552 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 553 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 554 555 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_state", "Get"): 556 if_obj.Get(WPAS_DBUS_IFACE, "State", 557 dbus_interface=dbus.PROPERTIES_IFACE) 558 559 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 560 bssid = apdev[0]['bssid'] 561 dev[0].scan_for_bss(bssid, freq=2412) 562 563 time.sleep(0.05) 564 for i in range(1, 3): 565 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_bsss", "Get"): 566 if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 567 dbus_interface=dbus.PROPERTIES_IFACE) 568 569 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 570 dbus_interface=dbus.PROPERTIES_IFACE) 571 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 572 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_getter_bss_rates", "Get"): 573 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 574 dbus_interface=dbus.PROPERTIES_IFACE) 575 with alloc_fail(dev[0], 1, 576 "wpa_bss_get_bit_rates;wpas_dbus_getter_bss_rates"): 577 try: 578 bss_obj.Get(WPAS_DBUS_BSS, "Rates", 579 dbus_interface=dbus.PROPERTIES_IFACE) 580 except dbus.exceptions.DBusException as e: 581 pass 582 583 id = dev[0].add_network() 584 dev[0].set_network(id, "disabled", "0") 585 dev[0].set_network_quoted(id, "ssid", "test") 586 587 for i in range(1, 3): 588 with alloc_fail_dbus(dev[0], i, "=wpas_dbus_getter_networks", "Get"): 589 if_obj.Get(WPAS_DBUS_IFACE, "Networks", 590 dbus_interface=dbus.PROPERTIES_IFACE) 591 592 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_getter_interfaces", "Get"): 593 dbus_get(dbus, wpas_obj, "Interfaces") 594 595 for i in range(1, 6): 596 with alloc_fail_dbus(dev[0], i, "=eap_get_names_as_string_array;wpas_dbus_getter_eap_methods", "Get"): 597 dbus_get(dbus, wpas_obj, "EapMethods") 598 599 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_setter_config_methods", "Set", 600 expected="Error.Failed: Failed to set property"): 601 val2 = "push_button display" 602 if_obj.Set(WPAS_DBUS_IFACE_WPS, "ConfigMethods", val2, 603 dbus_interface=dbus.PROPERTIES_IFACE) 604 605 with alloc_fail_dbus(dev[0], 1, "=wpa_config_add_network;wpas_dbus_handler_wps_start", 606 "WPS.Start", 607 expected="UnknownError: WPS start failed"): 608 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670'}) 609 610def test_dbus_wps_pbc(dev, apdev): 611 """D-Bus WPS/PBC operation and signals""" 612 try: 613 _test_dbus_wps_pbc(dev, apdev) 614 finally: 615 dev[0].request("SET wps_cred_processing 0") 616 617def _test_dbus_wps_pbc(dev, apdev): 618 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 619 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 620 621 hapd = start_ap(apdev[0]) 622 hapd.request("WPS_PBC") 623 bssid = apdev[0]['bssid'] 624 dev[0].scan_for_bss(bssid, freq="2412") 625 dev[0].request("SET wps_cred_processing 2") 626 627 res = if_obj.Get(WPAS_DBUS_IFACE, 'BSSs', 628 dbus_interface=dbus.PROPERTIES_IFACE) 629 if len(res) != 1: 630 raise Exception("Missing BSSs entry: " + str(res)) 631 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, res[0]) 632 props = bss_obj.GetAll(WPAS_DBUS_BSS, dbus_interface=dbus.PROPERTIES_IFACE) 633 logger.debug("GetAll(%s, %s): %s" % (WPAS_DBUS_BSS, res[0], str(props))) 634 if 'WPS' not in props: 635 raise Exception("No WPS information in the BSS entry") 636 if 'Type' not in props['WPS']: 637 raise Exception("No Type field in the WPS dictionary") 638 if props['WPS']['Type'] != 'pbc': 639 raise Exception("Unexpected WPS Type: " + props['WPS']['Type']) 640 641 class TestDbusWps(TestDbus): 642 def __init__(self, bus, wps): 643 TestDbus.__init__(self, bus) 644 self.success_seen = False 645 self.credentials_received = False 646 self.wps = wps 647 648 def __enter__(self): 649 gobject.timeout_add(1, self.start_pbc) 650 gobject.timeout_add(15000, self.timeout) 651 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 652 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 653 "Credentials") 654 self.loop.run() 655 return self 656 657 def wpsEvent(self, name, args): 658 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 659 if name == "success": 660 self.success_seen = True 661 if self.credentials_received: 662 self.loop.quit() 663 664 def credentials(self, args): 665 logger.debug("credentials: " + str(args)) 666 self.credentials_received = True 667 if self.success_seen: 668 self.loop.quit() 669 670 def start_pbc(self, *args): 671 logger.debug("start_pbc") 672 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 673 return False 674 675 def success(self): 676 return self.success_seen and self.credentials_received 677 678 with TestDbusWps(bus, wps) as t: 679 if not t.success(): 680 raise Exception("Failure in D-Bus operations") 681 682 dev[0].wait_connected(timeout=10) 683 dev[0].request("DISCONNECT") 684 hapd.disable() 685 dev[0].flush_scan_cache() 686 687def test_dbus_wps_pbc_overlap(dev, apdev): 688 """D-Bus WPS/PBC operation and signal for PBC overlap""" 689 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 690 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 691 692 hapd = start_ap(apdev[0]) 693 hapd2 = start_ap(apdev[1], ssid="test-wps2", 694 ap_uuid="27ea801a-9e5c-4e73-bd82-f89cbcd10d7f") 695 hapd.request("WPS_PBC") 696 hapd2.request("WPS_PBC") 697 bssid = apdev[0]['bssid'] 698 dev[0].scan_for_bss(bssid, freq="2412") 699 bssid2 = apdev[1]['bssid'] 700 dev[0].scan_for_bss(bssid2, freq="2412") 701 702 class TestDbusWps(TestDbus): 703 def __init__(self, bus, wps): 704 TestDbus.__init__(self, bus) 705 self.overlap_seen = False 706 self.wps = wps 707 708 def __enter__(self): 709 gobject.timeout_add(1, self.start_pbc) 710 gobject.timeout_add(15000, self.timeout) 711 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 712 self.loop.run() 713 return self 714 715 def wpsEvent(self, name, args): 716 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 717 if name == "pbc-overlap": 718 self.overlap_seen = True 719 self.loop.quit() 720 721 def start_pbc(self, *args): 722 logger.debug("start_pbc") 723 self.wps.Start({'Role': 'enrollee', 'Type': 'pbc'}) 724 return False 725 726 def success(self): 727 return self.overlap_seen 728 729 with TestDbusWps(bus, wps) as t: 730 if not t.success(): 731 raise Exception("Failure in D-Bus operations") 732 733 dev[0].request("WPS_CANCEL") 734 dev[0].request("DISCONNECT") 735 hapd.disable() 736 dev[0].flush_scan_cache() 737 738def test_dbus_wps_pin(dev, apdev): 739 """D-Bus WPS/PIN operation and signals""" 740 try: 741 _test_dbus_wps_pin(dev, apdev) 742 finally: 743 dev[0].request("SET wps_cred_processing 0") 744 745def _test_dbus_wps_pin(dev, apdev): 746 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 747 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 748 749 hapd = start_ap(apdev[0]) 750 hapd.request("WPS_PIN any 12345670") 751 bssid = apdev[0]['bssid'] 752 dev[0].scan_for_bss(bssid, freq="2412") 753 dev[0].request("SET wps_cred_processing 2") 754 755 class TestDbusWps(TestDbus): 756 def __init__(self, bus): 757 TestDbus.__init__(self, bus) 758 self.success_seen = False 759 self.credentials_received = False 760 761 def __enter__(self): 762 gobject.timeout_add(1, self.start_pin) 763 gobject.timeout_add(15000, self.timeout) 764 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 765 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 766 "Credentials") 767 self.loop.run() 768 return self 769 770 def wpsEvent(self, name, args): 771 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 772 if name == "success": 773 self.success_seen = True 774 if self.credentials_received: 775 self.loop.quit() 776 777 def credentials(self, args): 778 logger.debug("credentials: " + str(args)) 779 self.credentials_received = True 780 if self.success_seen: 781 self.loop.quit() 782 783 def start_pin(self, *args): 784 logger.debug("start_pin") 785 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 786 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 787 'Bssid': bssid_ay}) 788 return False 789 790 def success(self): 791 return self.success_seen and self.credentials_received 792 793 with TestDbusWps(bus) as t: 794 if not t.success(): 795 raise Exception("Failure in D-Bus operations") 796 797 dev[0].wait_connected(timeout=10) 798 799def test_dbus_wps_pin2(dev, apdev): 800 """D-Bus WPS/PIN operation and signals (PIN from wpa_supplicant)""" 801 try: 802 _test_dbus_wps_pin2(dev, apdev) 803 finally: 804 dev[0].request("SET wps_cred_processing 0") 805 806def _test_dbus_wps_pin2(dev, apdev): 807 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 808 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 809 810 hapd = start_ap(apdev[0]) 811 bssid = apdev[0]['bssid'] 812 dev[0].scan_for_bss(bssid, freq="2412") 813 dev[0].request("SET wps_cred_processing 2") 814 815 class TestDbusWps(TestDbus): 816 def __init__(self, bus): 817 TestDbus.__init__(self, bus) 818 self.success_seen = False 819 self.failed = False 820 821 def __enter__(self): 822 gobject.timeout_add(1, self.start_pin) 823 gobject.timeout_add(15000, self.timeout) 824 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 825 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 826 "Credentials") 827 self.loop.run() 828 return self 829 830 def wpsEvent(self, name, args): 831 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 832 if name == "success": 833 self.success_seen = True 834 if self.credentials_received: 835 self.loop.quit() 836 837 def credentials(self, args): 838 logger.debug("credentials: " + str(args)) 839 self.credentials_received = True 840 if self.success_seen: 841 self.loop.quit() 842 843 def start_pin(self, *args): 844 logger.debug("start_pin") 845 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 846 res = wps.Start({'Role': 'enrollee', 'Type': 'pin', 847 'Bssid': bssid_ay}) 848 pin = res['Pin'] 849 h = hostapd.Hostapd(apdev[0]['ifname']) 850 h.request("WPS_PIN any " + pin) 851 return False 852 853 def success(self): 854 return self.success_seen and self.credentials_received 855 856 with TestDbusWps(bus) as t: 857 if not t.success(): 858 raise Exception("Failure in D-Bus operations") 859 860 dev[0].wait_connected(timeout=10) 861 862def test_dbus_wps_pin_m2d(dev, apdev): 863 """D-Bus WPS/PIN operation and signals with M2D""" 864 try: 865 _test_dbus_wps_pin_m2d(dev, apdev) 866 finally: 867 dev[0].request("SET wps_cred_processing 0") 868 869def _test_dbus_wps_pin_m2d(dev, apdev): 870 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 871 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 872 873 hapd = start_ap(apdev[0]) 874 bssid = apdev[0]['bssid'] 875 dev[0].scan_for_bss(bssid, freq="2412") 876 dev[0].request("SET wps_cred_processing 2") 877 878 class TestDbusWps(TestDbus): 879 def __init__(self, bus): 880 TestDbus.__init__(self, bus) 881 self.success_seen = False 882 self.credentials_received = False 883 884 def __enter__(self): 885 gobject.timeout_add(1, self.start_pin) 886 gobject.timeout_add(15000, self.timeout) 887 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 888 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 889 "Credentials") 890 self.loop.run() 891 return self 892 893 def wpsEvent(self, name, args): 894 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 895 if name == "success": 896 self.success_seen = True 897 if self.credentials_received: 898 self.loop.quit() 899 elif name == "m2d": 900 h = hostapd.Hostapd(apdev[0]['ifname']) 901 h.request("WPS_PIN any 12345670") 902 903 def credentials(self, args): 904 logger.debug("credentials: " + str(args)) 905 self.credentials_received = True 906 if self.success_seen: 907 self.loop.quit() 908 909 def start_pin(self, *args): 910 logger.debug("start_pin") 911 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 912 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 913 'Bssid': bssid_ay}) 914 return False 915 916 def success(self): 917 return self.success_seen and self.credentials_received 918 919 with TestDbusWps(bus) as t: 920 if not t.success(): 921 raise Exception("Failure in D-Bus operations") 922 923 dev[0].wait_connected(timeout=10) 924 925def test_dbus_wps_reg(dev, apdev): 926 """D-Bus WPS/Registrar operation and signals""" 927 try: 928 _test_dbus_wps_reg(dev, apdev) 929 finally: 930 dev[0].request("SET wps_cred_processing 0") 931 932def _test_dbus_wps_reg(dev, apdev): 933 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 934 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 935 936 hapd = start_ap(apdev[0]) 937 hapd.request("WPS_PIN any 12345670") 938 bssid = apdev[0]['bssid'] 939 dev[0].scan_for_bss(bssid, freq="2412") 940 dev[0].request("SET wps_cred_processing 2") 941 942 class TestDbusWps(TestDbus): 943 def __init__(self, bus): 944 TestDbus.__init__(self, bus) 945 self.credentials_received = False 946 947 def __enter__(self): 948 gobject.timeout_add(100, self.start_reg) 949 gobject.timeout_add(15000, self.timeout) 950 self.add_signal(self.wpsEvent, WPAS_DBUS_IFACE_WPS, "Event") 951 self.add_signal(self.credentials, WPAS_DBUS_IFACE_WPS, 952 "Credentials") 953 self.loop.run() 954 return self 955 956 def wpsEvent(self, name, args): 957 logger.debug("wpsEvent: %s args='%s'" % (name, str(args))) 958 959 def credentials(self, args): 960 logger.debug("credentials: " + str(args)) 961 self.credentials_received = True 962 self.loop.quit() 963 964 def start_reg(self, *args): 965 logger.debug("start_reg") 966 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 967 wps.Start({'Role': 'registrar', 'Type': 'pin', 968 'Pin': '12345670', 'Bssid': bssid_ay}) 969 return False 970 971 def success(self): 972 return self.credentials_received 973 974 with TestDbusWps(bus) as t: 975 if not t.success(): 976 raise Exception("Failure in D-Bus operations") 977 978 dev[0].wait_connected(timeout=10) 979 980def test_dbus_wps_cancel(dev, apdev): 981 """D-Bus WPS Cancel operation""" 982 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 983 wps = dbus.Interface(if_obj, WPAS_DBUS_IFACE_WPS) 984 985 hapd = start_ap(apdev[0]) 986 bssid = apdev[0]['bssid'] 987 988 wps.Cancel() 989 dev[0].scan_for_bss(bssid, freq="2412") 990 bssid_ay = dbus.ByteArray(binascii.unhexlify(bssid.replace(':', '').encode())) 991 wps.Start({'Role': 'enrollee', 'Type': 'pin', 'Pin': '12345670', 992 'Bssid': bssid_ay}) 993 wps.Cancel() 994 dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 1) 995 996def test_dbus_scan_invalid(dev, apdev): 997 """D-Bus invalid scan method""" 998 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 999 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1000 1001 tests = [({}, "InvalidArgs"), 1002 ({'Type': 123}, "InvalidArgs"), 1003 ({'Type': 'foo'}, "InvalidArgs"), 1004 ({'Type': 'active', 'Foo': 'bar'}, "InvalidArgs"), 1005 ({'Type': 'active', 'SSIDs': 'foo'}, "InvalidArgs"), 1006 ({'Type': 'active', 'SSIDs': ['foo']}, "InvalidArgs"), 1007 ({'Type': 'active', 1008 'SSIDs': [dbus.ByteArray(b"1"), dbus.ByteArray(b"2"), 1009 dbus.ByteArray(b"3"), dbus.ByteArray(b"4"), 1010 dbus.ByteArray(b"5"), dbus.ByteArray(b"6"), 1011 dbus.ByteArray(b"7"), dbus.ByteArray(b"8"), 1012 dbus.ByteArray(b"9"), dbus.ByteArray(b"10"), 1013 dbus.ByteArray(b"11"), dbus.ByteArray(b"12"), 1014 dbus.ByteArray(b"13"), dbus.ByteArray(b"14"), 1015 dbus.ByteArray(b"15"), dbus.ByteArray(b"16"), 1016 dbus.ByteArray(b"17")]}, 1017 "InvalidArgs"), 1018 ({'Type': 'active', 1019 'SSIDs': [dbus.ByteArray(b"1234567890abcdef1234567890abcdef1")]}, 1020 "InvalidArgs"), 1021 ({'Type': 'active', 'IEs': 'foo'}, "InvalidArgs"), 1022 ({'Type': 'active', 'IEs': ['foo']}, "InvalidArgs"), 1023 ({'Type': 'active', 'Channels': 2412}, "InvalidArgs"), 1024 ({'Type': 'active', 'Channels': [2412]}, "InvalidArgs"), 1025 ({'Type': 'active', 1026 'Channels': [(dbus.Int32(2412), dbus.UInt32(20))]}, 1027 "InvalidArgs"), 1028 ({'Type': 'active', 1029 'Channels': [(dbus.UInt32(2412), dbus.Int32(20))]}, 1030 "InvalidArgs"), 1031 ({'Type': 'active', 'AllowRoam': "yes"}, "InvalidArgs"), 1032 ({'Type': 'passive', 'IEs': [dbus.ByteArray(b"\xdd\x00")]}, 1033 "InvalidArgs"), 1034 ({'Type': 'passive', 'SSIDs': [dbus.ByteArray(b"foo")]}, 1035 "InvalidArgs")] 1036 for (t, err) in tests: 1037 try: 1038 iface.Scan(t) 1039 raise Exception("Invalid Scan() arguments accepted: " + str(t)) 1040 except dbus.exceptions.DBusException as e: 1041 if err not in str(e): 1042 raise Exception("Unexpected error message for invalid Scan(%s): %s" % (str(t), str(e))) 1043 1044def test_dbus_scan_oom(dev, apdev): 1045 """D-Bus scan method and OOM""" 1046 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1047 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1048 1049 with alloc_fail_dbus(dev[0], 1, 1050 "wpa_scan_clone_params;wpas_dbus_handler_scan", 1051 "Scan", expected="ScanError: Scan request rejected"): 1052 iface.Scan({'Type': 'passive', 1053 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1054 1055 with alloc_fail_dbus(dev[0], 1, 1056 "=wpas_dbus_get_scan_channels;wpas_dbus_handler_scan", 1057 "Scan"): 1058 iface.Scan({'Type': 'passive', 1059 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1060 1061 with alloc_fail_dbus(dev[0], 1, 1062 "=wpas_dbus_get_scan_ies;wpas_dbus_handler_scan", 1063 "Scan"): 1064 iface.Scan({'Type': 'active', 1065 'IEs': [dbus.ByteArray(b"\xdd\x00")], 1066 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1067 1068 with alloc_fail_dbus(dev[0], 1, 1069 "=wpas_dbus_get_scan_ssids;wpas_dbus_handler_scan", 1070 "Scan"): 1071 iface.Scan({'Type': 'active', 1072 'SSIDs': [dbus.ByteArray(b"open"), 1073 dbus.ByteArray()], 1074 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1075 1076def test_dbus_scan(dev, apdev): 1077 """D-Bus scan and related signals""" 1078 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1079 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1080 1081 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 1082 1083 class TestDbusScan(TestDbus): 1084 def __init__(self, bus): 1085 TestDbus.__init__(self, bus) 1086 self.scan_completed = 0 1087 self.bss_added = False 1088 self.fail_reason = None 1089 1090 def __enter__(self): 1091 gobject.timeout_add(1, self.run_scan) 1092 gobject.timeout_add(15000, self.timeout) 1093 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 1094 self.add_signal(self.bssAdded, WPAS_DBUS_IFACE, "BSSAdded") 1095 self.add_signal(self.bssRemoved, WPAS_DBUS_IFACE, "BSSRemoved") 1096 self.loop.run() 1097 return self 1098 1099 def scanDone(self, success): 1100 logger.debug("scanDone: success=%s" % success) 1101 self.scan_completed += 1 1102 if self.scan_completed == 1: 1103 iface.Scan({'Type': 'passive', 1104 'AllowRoam': True, 1105 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1106 elif self.scan_completed == 2: 1107 iface.Scan({'Type': 'passive', 1108 'AllowRoam': False}) 1109 elif self.bss_added and self.scan_completed == 3: 1110 self.loop.quit() 1111 1112 def bssAdded(self, bss, properties): 1113 logger.debug("bssAdded: %s" % bss) 1114 logger.debug(str(properties)) 1115 if 'WPS' in properties: 1116 if 'Type' in properties['WPS']: 1117 self.fail_reason = "Unexpected WPS dictionary entry in non-WPS BSS" 1118 self.loop.quit() 1119 self.bss_added = True 1120 if self.scan_completed == 3: 1121 self.loop.quit() 1122 1123 def bssRemoved(self, bss): 1124 logger.debug("bssRemoved: %s" % bss) 1125 1126 def run_scan(self, *args): 1127 logger.debug("run_scan") 1128 iface.Scan({'Type': 'active', 1129 'SSIDs': [dbus.ByteArray(b"open"), 1130 dbus.ByteArray()], 1131 'IEs': [dbus.ByteArray(b"\xdd\x00"), 1132 dbus.ByteArray()], 1133 'AllowRoam': False, 1134 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 1135 return False 1136 1137 def success(self): 1138 return self.scan_completed == 3 and self.bss_added 1139 1140 with TestDbusScan(bus) as t: 1141 if t.fail_reason: 1142 raise Exception(t.fail_reason) 1143 if not t.success(): 1144 raise Exception("Expected signals not seen") 1145 1146 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1147 dbus_interface=dbus.PROPERTIES_IFACE) 1148 if len(res) < 1: 1149 raise Exception("Scan result not in BSSs property") 1150 iface.FlushBSS(0) 1151 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSs", 1152 dbus_interface=dbus.PROPERTIES_IFACE) 1153 if len(res) != 0: 1154 raise Exception("FlushBSS() did not remove scan results from BSSs property") 1155 iface.FlushBSS(1) 1156 1157def test_dbus_scan_rand(dev, apdev): 1158 """D-Bus MACAddressRandomizationMask property Get/Set""" 1159 try: 1160 run_dbus_scan_rand(dev, apdev) 1161 finally: 1162 dev[0].request("MAC_RAND_SCAN all enable=0") 1163 1164def run_dbus_scan_rand(dev, apdev): 1165 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1166 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1167 1168 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1169 dbus_interface=dbus.PROPERTIES_IFACE) 1170 if len(res) != 0: 1171 logger.info(str(res)) 1172 raise Exception("Unexpected initial MACAddressRandomizationMask value") 1173 1174 try: 1175 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", "foo", 1176 dbus_interface=dbus.PROPERTIES_IFACE) 1177 raise Exception("Invalid Set accepted") 1178 except dbus.exceptions.DBusException as e: 1179 if "InvalidArgs: invalid message format" not in str(e): 1180 raise Exception("Unexpected error message: " + str(e)) 1181 1182 try: 1183 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1184 {"foo": "bar"}, 1185 dbus_interface=dbus.PROPERTIES_IFACE) 1186 raise Exception("Invalid Set accepted") 1187 except dbus.exceptions.DBusException as e: 1188 if "wpas_dbus_setter_mac_address_randomization_mask: mask was not a byte array" not in str(e): 1189 raise Exception("Unexpected error message: " + str(e)) 1190 1191 try: 1192 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1193 {"foo": dbus.ByteArray(b'123456')}, 1194 dbus_interface=dbus.PROPERTIES_IFACE) 1195 raise Exception("Invalid Set accepted") 1196 except dbus.exceptions.DBusException as e: 1197 if 'wpas_dbus_setter_mac_address_randomization_mask: bad scan type "foo"' not in str(e): 1198 raise Exception("Unexpected error message: " + str(e)) 1199 1200 try: 1201 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1202 {"scan": dbus.ByteArray(b'12345')}, 1203 dbus_interface=dbus.PROPERTIES_IFACE) 1204 raise Exception("Invalid Set accepted") 1205 except dbus.exceptions.DBusException as e: 1206 if 'wpas_dbus_setter_mac_address_randomization_mask: malformed MAC mask given' not in str(e): 1207 raise Exception("Unexpected error message: " + str(e)) 1208 1209 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1210 {"scan": dbus.ByteArray(b'123456')}, 1211 dbus_interface=dbus.PROPERTIES_IFACE) 1212 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1213 dbus_interface=dbus.PROPERTIES_IFACE) 1214 if len(res) != 1: 1215 logger.info(str(res)) 1216 raise Exception("Unexpected MACAddressRandomizationMask value") 1217 1218 try: 1219 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1220 {"scan": dbus.ByteArray(b'123456'), 1221 "sched_scan": dbus.ByteArray(b'987654')}, 1222 dbus_interface=dbus.PROPERTIES_IFACE) 1223 except dbus.exceptions.DBusException as e: 1224 # sched_scan is unlikely to be supported 1225 pass 1226 1227 if_obj.Set(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1228 dbus.Dictionary({}, signature='sv'), 1229 dbus_interface=dbus.PROPERTIES_IFACE) 1230 res = if_obj.Get(WPAS_DBUS_IFACE, "MACAddressRandomizationMask", 1231 dbus_interface=dbus.PROPERTIES_IFACE) 1232 if len(res) != 0: 1233 logger.info(str(res)) 1234 raise Exception("Unexpected MACAddressRandomizationMask value") 1235 1236def test_dbus_scan_busy(dev, apdev): 1237 """D-Bus scan trigger rejection when busy with previous scan""" 1238 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1239 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1240 1241 if "OK" not in dev[0].request("SCAN freq=2412-2462"): 1242 raise Exception("Failed to start scan") 1243 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1244 if ev is None: 1245 raise Exception("Scan start timed out") 1246 1247 try: 1248 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1249 raise Exception("Scan() accepted when busy") 1250 except dbus.exceptions.DBusException as e: 1251 if "ScanError: Scan request reject" not in str(e): 1252 raise Exception("Unexpected error message: " + str(e)) 1253 1254 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1255 if ev is None: 1256 raise Exception("Scan timed out") 1257 1258def test_dbus_scan_abort(dev, apdev): 1259 """D-Bus scan trigger and abort""" 1260 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1261 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1262 1263 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1264 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-STARTED"], 15) 1265 if ev is None: 1266 raise Exception("Scan start timed out") 1267 1268 iface.AbortScan() 1269 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1270 if ev is None: 1271 raise Exception("Scan abort result timed out") 1272 dev[0].dump_monitor() 1273 iface.Scan({'Type': 'active', 'AllowRoam': False}) 1274 iface.AbortScan() 1275 1276 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-RESULTS"], 15) 1277 if ev is None: 1278 raise Exception("Scan timed out") 1279 1280def test_dbus_connect(dev, apdev): 1281 """D-Bus AddNetwork and connect""" 1282 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1283 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1284 1285 ssid = "test-wpa2-psk" 1286 passphrase = 'qwertyuiop' 1287 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1288 hapd = hostapd.add_ap(apdev[0], params) 1289 1290 class TestDbusConnect(TestDbus): 1291 def __init__(self, bus): 1292 TestDbus.__init__(self, bus) 1293 self.network_added = False 1294 self.network_selected = False 1295 self.network_removed = False 1296 self.state = 0 1297 1298 def __enter__(self): 1299 gobject.timeout_add(1, self.run_connect) 1300 gobject.timeout_add(15000, self.timeout) 1301 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1302 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1303 "NetworkRemoved") 1304 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1305 "NetworkSelected") 1306 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1307 "PropertiesChanged") 1308 self.loop.run() 1309 return self 1310 1311 def networkAdded(self, network, properties): 1312 logger.debug("networkAdded: %s" % str(network)) 1313 logger.debug(str(properties)) 1314 self.network_added = True 1315 1316 def networkRemoved(self, network): 1317 logger.debug("networkRemoved: %s" % str(network)) 1318 self.network_removed = True 1319 1320 def networkSelected(self, network): 1321 logger.debug("networkSelected: %s" % str(network)) 1322 self.network_selected = True 1323 1324 def propertiesChanged(self, properties): 1325 logger.debug("propertiesChanged: %s" % str(properties)) 1326 if 'State' in properties and properties['State'] == "completed": 1327 if self.state == 0: 1328 self.state = 1 1329 iface.Disconnect() 1330 elif self.state == 2: 1331 self.state = 3 1332 iface.Disconnect() 1333 elif self.state == 4: 1334 self.state = 5 1335 iface.Reattach() 1336 elif self.state == 5: 1337 self.state = 6 1338 iface.Disconnect() 1339 elif self.state == 7: 1340 self.state = 8 1341 res = iface.SignalPoll() 1342 logger.debug("SignalPoll: " + str(res)) 1343 if 'frequency' not in res or res['frequency'] != 2412: 1344 self.state = -1 1345 logger.info("Unexpected SignalPoll result") 1346 iface.RemoveNetwork(self.netw) 1347 if 'State' in properties and properties['State'] == "disconnected": 1348 if self.state == 1: 1349 self.state = 2 1350 iface.SelectNetwork(self.netw) 1351 elif self.state == 3: 1352 self.state = 4 1353 iface.Reassociate() 1354 elif self.state == 6: 1355 self.state = 7 1356 iface.Reconnect() 1357 elif self.state == 8: 1358 self.state = 9 1359 self.loop.quit() 1360 1361 def run_connect(self, *args): 1362 logger.debug("run_connect") 1363 args = dbus.Dictionary({'ssid': ssid, 1364 'key_mgmt': 'WPA-PSK', 1365 'psk': passphrase, 1366 'scan_freq': 2412}, 1367 signature='sv') 1368 self.netw = iface.AddNetwork(args) 1369 iface.SelectNetwork(self.netw) 1370 return False 1371 1372 def success(self): 1373 if not self.network_added or \ 1374 not self.network_removed or \ 1375 not self.network_selected: 1376 return False 1377 return self.state == 9 1378 1379 with TestDbusConnect(bus) as t: 1380 if not t.success(): 1381 raise Exception("Expected signals not seen") 1382 1383def test_dbus_remove_connected(dev, apdev): 1384 """D-Bus RemoveAllNetworks while connected""" 1385 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1386 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1387 1388 ssid = "test-open" 1389 hapd = hostapd.add_ap(apdev[0], {"ssid": ssid}) 1390 1391 class TestDbusConnect(TestDbus): 1392 def __init__(self, bus): 1393 TestDbus.__init__(self, bus) 1394 self.network_added = False 1395 self.network_selected = False 1396 self.network_removed = False 1397 self.state = 0 1398 1399 def __enter__(self): 1400 gobject.timeout_add(1, self.run_connect) 1401 gobject.timeout_add(15000, self.timeout) 1402 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1403 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1404 "NetworkRemoved") 1405 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1406 "NetworkSelected") 1407 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1408 "PropertiesChanged") 1409 self.loop.run() 1410 return self 1411 1412 def networkAdded(self, network, properties): 1413 logger.debug("networkAdded: %s" % str(network)) 1414 logger.debug(str(properties)) 1415 self.network_added = True 1416 1417 def networkRemoved(self, network): 1418 logger.debug("networkRemoved: %s" % str(network)) 1419 self.network_removed = True 1420 1421 def networkSelected(self, network): 1422 logger.debug("networkSelected: %s" % str(network)) 1423 self.network_selected = True 1424 1425 def propertiesChanged(self, properties): 1426 logger.debug("propertiesChanged: %s" % str(properties)) 1427 if 'State' in properties and properties['State'] == "completed": 1428 if self.state == 0: 1429 self.state = 1 1430 iface.Disconnect() 1431 elif self.state == 2: 1432 self.state = 3 1433 iface.Disconnect() 1434 elif self.state == 4: 1435 self.state = 5 1436 iface.Reattach() 1437 elif self.state == 5: 1438 self.state = 6 1439 iface.Disconnect() 1440 elif self.state == 7: 1441 self.state = 8 1442 res = iface.SignalPoll() 1443 logger.debug("SignalPoll: " + str(res)) 1444 if 'frequency' not in res or res['frequency'] != 2412: 1445 self.state = -1 1446 logger.info("Unexpected SignalPoll result") 1447 iface.RemoveAllNetworks() 1448 if 'State' in properties and properties['State'] == "disconnected": 1449 if self.state == 1: 1450 self.state = 2 1451 iface.SelectNetwork(self.netw) 1452 elif self.state == 3: 1453 self.state = 4 1454 iface.Reassociate() 1455 elif self.state == 6: 1456 self.state = 7 1457 iface.Reconnect() 1458 elif self.state == 8: 1459 self.state = 9 1460 self.loop.quit() 1461 1462 def run_connect(self, *args): 1463 logger.debug("run_connect") 1464 args = dbus.Dictionary({'ssid': ssid, 1465 'key_mgmt': 'NONE', 1466 'scan_freq': 2412}, 1467 signature='sv') 1468 self.netw = iface.AddNetwork(args) 1469 iface.SelectNetwork(self.netw) 1470 return False 1471 1472 def success(self): 1473 if not self.network_added or \ 1474 not self.network_removed or \ 1475 not self.network_selected: 1476 return False 1477 return self.state == 9 1478 1479 with TestDbusConnect(bus) as t: 1480 if not t.success(): 1481 raise Exception("Expected signals not seen") 1482 1483def test_dbus_connect_psk_mem(dev, apdev): 1484 """D-Bus AddNetwork and connect with memory-only PSK""" 1485 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1486 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1487 1488 ssid = "test-wpa2-psk" 1489 passphrase = 'qwertyuiop' 1490 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1491 hapd = hostapd.add_ap(apdev[0], params) 1492 1493 class TestDbusConnect(TestDbus): 1494 def __init__(self, bus): 1495 TestDbus.__init__(self, bus) 1496 self.connected = False 1497 1498 def __enter__(self): 1499 gobject.timeout_add(1, self.run_connect) 1500 gobject.timeout_add(15000, self.timeout) 1501 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1502 "PropertiesChanged") 1503 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1504 "NetworkRequest") 1505 self.loop.run() 1506 return self 1507 1508 def propertiesChanged(self, properties): 1509 logger.debug("propertiesChanged: %s" % str(properties)) 1510 if 'State' in properties and properties['State'] == "completed": 1511 self.connected = True 1512 self.loop.quit() 1513 1514 def networkRequest(self, path, field, txt): 1515 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1516 if field == "PSK_PASSPHRASE": 1517 iface.NetworkReply(path, field, '"' + passphrase + '"') 1518 1519 def run_connect(self, *args): 1520 logger.debug("run_connect") 1521 args = dbus.Dictionary({'ssid': ssid, 1522 'key_mgmt': 'WPA-PSK', 1523 'mem_only_psk': 1, 1524 'scan_freq': 2412}, 1525 signature='sv') 1526 self.netw = iface.AddNetwork(args) 1527 iface.SelectNetwork(self.netw) 1528 return False 1529 1530 def success(self): 1531 return self.connected 1532 1533 with TestDbusConnect(bus) as t: 1534 if not t.success(): 1535 raise Exception("Expected signals not seen") 1536 1537def test_dbus_connect_oom(dev, apdev): 1538 """D-Bus AddNetwork and connect when out-of-memory""" 1539 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1540 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1541 1542 if "OK" not in dev[0].request("TEST_ALLOC_FAIL 0:"): 1543 raise HwsimSkip("TEST_ALLOC_FAIL not supported in the build") 1544 1545 ssid = "test-wpa2-psk" 1546 passphrase = 'qwertyuiop' 1547 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 1548 hapd = hostapd.add_ap(apdev[0], params) 1549 1550 class TestDbusConnect(TestDbus): 1551 def __init__(self, bus): 1552 TestDbus.__init__(self, bus) 1553 self.network_added = False 1554 self.network_selected = False 1555 self.network_removed = False 1556 self.state = 0 1557 1558 def __enter__(self): 1559 gobject.timeout_add(1, self.run_connect) 1560 gobject.timeout_add(1500, self.timeout) 1561 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 1562 self.add_signal(self.networkRemoved, WPAS_DBUS_IFACE, 1563 "NetworkRemoved") 1564 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 1565 "NetworkSelected") 1566 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1567 "PropertiesChanged") 1568 self.loop.run() 1569 return self 1570 1571 def networkAdded(self, network, properties): 1572 logger.debug("networkAdded: %s" % str(network)) 1573 logger.debug(str(properties)) 1574 self.network_added = True 1575 1576 def networkRemoved(self, network): 1577 logger.debug("networkRemoved: %s" % str(network)) 1578 self.network_removed = True 1579 1580 def networkSelected(self, network): 1581 logger.debug("networkSelected: %s" % str(network)) 1582 self.network_selected = True 1583 1584 def propertiesChanged(self, properties): 1585 logger.debug("propertiesChanged: %s" % str(properties)) 1586 if 'State' in properties and properties['State'] == "completed": 1587 if self.state == 0: 1588 self.state = 1 1589 iface.Disconnect() 1590 elif self.state == 2: 1591 self.state = 3 1592 iface.Disconnect() 1593 elif self.state == 4: 1594 self.state = 5 1595 iface.Reattach() 1596 elif self.state == 5: 1597 self.state = 6 1598 res = iface.SignalPoll() 1599 logger.debug("SignalPoll: " + str(res)) 1600 if 'frequency' not in res or res['frequency'] != 2412: 1601 self.state = -1 1602 logger.info("Unexpected SignalPoll result") 1603 iface.RemoveNetwork(self.netw) 1604 if 'State' in properties and properties['State'] == "disconnected": 1605 if self.state == 1: 1606 self.state = 2 1607 iface.SelectNetwork(self.netw) 1608 elif self.state == 3: 1609 self.state = 4 1610 iface.Reassociate() 1611 elif self.state == 6: 1612 self.state = 7 1613 self.loop.quit() 1614 1615 def run_connect(self, *args): 1616 logger.debug("run_connect") 1617 args = dbus.Dictionary({'ssid': ssid, 1618 'key_mgmt': 'WPA-PSK', 1619 'psk': passphrase, 1620 'scan_freq': 2412}, 1621 signature='sv') 1622 try: 1623 self.netw = iface.AddNetwork(args) 1624 except Exception as e: 1625 logger.info("Exception on AddNetwork: " + str(e)) 1626 self.loop.quit() 1627 return False 1628 try: 1629 iface.SelectNetwork(self.netw) 1630 except Exception as e: 1631 logger.info("Exception on SelectNetwork: " + str(e)) 1632 self.loop.quit() 1633 1634 return False 1635 1636 def success(self): 1637 if not self.network_added or \ 1638 not self.network_removed or \ 1639 not self.network_selected: 1640 return False 1641 return self.state == 7 1642 1643 count = 0 1644 for i in range(1, 1000): 1645 for j in range(3): 1646 dev[j].dump_monitor() 1647 dev[0].request("TEST_ALLOC_FAIL %d:main" % i) 1648 try: 1649 with TestDbusConnect(bus) as t: 1650 if not t.success(): 1651 logger.info("Iteration %d - Expected signals not seen" % i) 1652 else: 1653 logger.info("Iteration %d - success" % i) 1654 1655 state = dev[0].request('GET_ALLOC_FAIL') 1656 logger.info("GET_ALLOC_FAIL: " + state) 1657 dev[0].dump_monitor() 1658 dev[0].request("TEST_ALLOC_FAIL 0:") 1659 if i < 3: 1660 raise Exception("Connection succeeded during out-of-memory") 1661 if not state.startswith('0:'): 1662 count += 1 1663 if count == 5: 1664 break 1665 except: 1666 pass 1667 1668 # Force regulatory update to re-fetch hw capabilities for the following 1669 # test cases. 1670 try: 1671 dev[0].dump_monitor() 1672 subprocess.call(['iw', 'reg', 'set', 'US']) 1673 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1674 finally: 1675 dev[0].dump_monitor() 1676 subprocess.call(['iw', 'reg', 'set', '00']) 1677 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 1678 1679def test_dbus_while_not_connected(dev, apdev): 1680 """D-Bus invalid operations while not connected""" 1681 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1682 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1683 1684 try: 1685 iface.Disconnect() 1686 raise Exception("Disconnect() accepted when not connected") 1687 except dbus.exceptions.DBusException as e: 1688 if "NotConnected" not in str(e): 1689 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 1690 1691 try: 1692 iface.Reattach() 1693 raise Exception("Reattach() accepted when not connected") 1694 except dbus.exceptions.DBusException as e: 1695 if "NotConnected" not in str(e): 1696 raise Exception("Unexpected error message for invalid Reattach: " + str(e)) 1697 1698def test_dbus_connect_eap(dev, apdev): 1699 """D-Bus AddNetwork and connect to EAP network""" 1700 check_altsubject_match_support(dev[0]) 1701 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1702 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1703 1704 ssid = "ieee8021x-open" 1705 params = hostapd.radius_params() 1706 params["ssid"] = ssid 1707 params["ieee8021x"] = "1" 1708 hapd = hostapd.add_ap(apdev[0], params) 1709 1710 class TestDbusConnect(TestDbus): 1711 def __init__(self, bus): 1712 TestDbus.__init__(self, bus) 1713 self.certification_received = False 1714 self.eap_status = False 1715 self.state = 0 1716 1717 def __enter__(self): 1718 gobject.timeout_add(1, self.run_connect) 1719 gobject.timeout_add(15000, self.timeout) 1720 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 1721 "PropertiesChanged") 1722 self.add_signal(self.certification, WPAS_DBUS_IFACE, 1723 "Certification", byte_arrays=True) 1724 self.add_signal(self.networkRequest, WPAS_DBUS_IFACE, 1725 "NetworkRequest") 1726 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 1727 self.loop.run() 1728 return self 1729 1730 def propertiesChanged(self, properties): 1731 logger.debug("propertiesChanged: %s" % str(properties)) 1732 if 'State' in properties and properties['State'] == "completed": 1733 if self.state == 0: 1734 self.state = 1 1735 iface.EAPLogoff() 1736 logger.info("Set dNSName constraint") 1737 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1738 args = dbus.Dictionary({'altsubject_match': 1739 self.server_dnsname}, 1740 signature='sv') 1741 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1742 dbus_interface=dbus.PROPERTIES_IFACE) 1743 elif self.state == 2: 1744 self.state = 3 1745 iface.Disconnect() 1746 logger.info("Set non-matching dNSName constraint") 1747 net_obj = bus.get_object(WPAS_DBUS_SERVICE, self.netw) 1748 args = dbus.Dictionary({'altsubject_match': 1749 self.server_dnsname + "FOO"}, 1750 signature='sv') 1751 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1752 dbus_interface=dbus.PROPERTIES_IFACE) 1753 if 'State' in properties and properties['State'] == "disconnected": 1754 if self.state == 1: 1755 self.state = 2 1756 iface.EAPLogon() 1757 iface.SelectNetwork(self.netw) 1758 if self.state == 3: 1759 self.state = 4 1760 iface.SelectNetwork(self.netw) 1761 1762 def certification(self, args): 1763 logger.debug("certification: %s" % str(args)) 1764 self.certification_received = True 1765 if args['depth'] == 0: 1766 # The test server certificate is supposed to have dNSName 1767 if len(args['altsubject']) < 1: 1768 raise Exception("Missing dNSName") 1769 dnsname = args['altsubject'][0] 1770 if not dnsname.startswith("DNS:"): 1771 raise Exception("Expected dNSName not found: " + dnsname) 1772 logger.info("altsubject: " + dnsname) 1773 self.server_dnsname = dnsname 1774 1775 def eap(self, status, parameter): 1776 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 1777 if status == 'completion' and parameter == 'success': 1778 self.eap_status = True 1779 if self.state == 4 and status == 'remote certificate verification' and parameter == 'AltSubject mismatch': 1780 self.state = 5 1781 self.loop.quit() 1782 1783 def networkRequest(self, path, field, txt): 1784 logger.debug("networkRequest: %s %s %s" % (path, field, txt)) 1785 if field == "PASSWORD": 1786 iface.NetworkReply(path, field, "password") 1787 1788 def run_connect(self, *args): 1789 logger.debug("run_connect") 1790 args = dbus.Dictionary({'ssid': ssid, 1791 'key_mgmt': 'IEEE8021X', 1792 'eapol_flags': 0, 1793 'eap': 'TTLS', 1794 'anonymous_identity': 'ttls', 1795 'identity': 'pap user', 1796 'ca_cert': 'auth_serv/ca.pem', 1797 'phase2': 'auth=PAP', 1798 'scan_freq': 2412}, 1799 signature='sv') 1800 self.netw = iface.AddNetwork(args) 1801 iface.SelectNetwork(self.netw) 1802 return False 1803 1804 def success(self): 1805 if not self.eap_status or not self.certification_received: 1806 return False 1807 return self.state == 5 1808 1809 with TestDbusConnect(bus) as t: 1810 if not t.success(): 1811 raise Exception("Expected signals not seen") 1812 1813def test_dbus_network(dev, apdev): 1814 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 1815 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1816 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1817 1818 args = dbus.Dictionary({'ssid': "foo", 1819 'key_mgmt': 'WPA-PSK', 1820 'psk': "12345678", 1821 'identity': dbus.ByteArray([1, 2]), 1822 'priority': dbus.Int32(0), 1823 'scan_freq': dbus.UInt32(2412)}, 1824 signature='sv') 1825 netw = iface.AddNetwork(args) 1826 id = int(dev[0].list_networks()[0]['id']) 1827 val = dev[0].get_network(id, "scan_freq") 1828 if val != "2412": 1829 raise Exception("Invalid scan_freq value: " + str(val)) 1830 iface.RemoveNetwork(netw) 1831 1832 args = dbus.Dictionary({'ssid': "foo", 1833 'key_mgmt': 'NONE', 1834 'scan_freq': "2412 2432", 1835 'freq_list': "2412 2417 2432"}, 1836 signature='sv') 1837 netw = iface.AddNetwork(args) 1838 id = int(dev[0].list_networks()[0]['id']) 1839 val = dev[0].get_network(id, "scan_freq") 1840 if val != "2412 2432": 1841 raise Exception("Invalid scan_freq value (2): " + str(val)) 1842 val = dev[0].get_network(id, "freq_list") 1843 if val != "2412 2417 2432": 1844 raise Exception("Invalid freq_list value: " + str(val)) 1845 iface.RemoveNetwork(netw) 1846 try: 1847 iface.RemoveNetwork(netw) 1848 raise Exception("Invalid RemoveNetwork() accepted") 1849 except dbus.exceptions.DBusException as e: 1850 if "NetworkUnknown" not in str(e): 1851 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1852 try: 1853 iface.SelectNetwork(netw) 1854 raise Exception("Invalid SelectNetwork() accepted") 1855 except dbus.exceptions.DBusException as e: 1856 if "NetworkUnknown" not in str(e): 1857 raise Exception("Unexpected error message for invalid RemoveNetwork: " + str(e)) 1858 1859 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1860 'identity': "testuser", 'scan_freq': '2412'}, 1861 signature='sv') 1862 netw1 = iface.AddNetwork(args) 1863 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1864 signature='sv') 1865 netw2 = iface.AddNetwork(args) 1866 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1867 dbus_interface=dbus.PROPERTIES_IFACE) 1868 if len(res) != 2: 1869 raise Exception("Unexpected number of networks") 1870 1871 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1872 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1873 dbus_interface=dbus.PROPERTIES_IFACE) 1874 if res != False: 1875 raise Exception("Added network was unexpectedly enabled by default") 1876 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(True), 1877 dbus_interface=dbus.PROPERTIES_IFACE) 1878 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1879 dbus_interface=dbus.PROPERTIES_IFACE) 1880 if res != True: 1881 raise Exception("Set(Enabled,True) did not seem to change property value") 1882 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.Boolean(False), 1883 dbus_interface=dbus.PROPERTIES_IFACE) 1884 res = net_obj.Get(WPAS_DBUS_NETWORK, "Enabled", 1885 dbus_interface=dbus.PROPERTIES_IFACE) 1886 if res != False: 1887 raise Exception("Set(Enabled,False) did not seem to change property value") 1888 try: 1889 net_obj.Set(WPAS_DBUS_NETWORK, "Enabled", dbus.UInt32(1), 1890 dbus_interface=dbus.PROPERTIES_IFACE) 1891 raise Exception("Invalid Set(Enabled,1) accepted") 1892 except dbus.exceptions.DBusException as e: 1893 if "Error.Failed: wrong property type" not in str(e): 1894 raise Exception("Unexpected error message for invalid Set(Enabled,1): " + str(e)) 1895 1896 args = dbus.Dictionary({'ssid': "foo1new"}, signature='sv') 1897 net_obj.Set(WPAS_DBUS_NETWORK, "Properties", args, 1898 dbus_interface=dbus.PROPERTIES_IFACE) 1899 res = net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1900 dbus_interface=dbus.PROPERTIES_IFACE) 1901 if res['ssid'] != '"foo1new"': 1902 raise Exception("Set(Properties) failed to update ssid") 1903 if res['identity'] != '"testuser"': 1904 raise Exception("Set(Properties) unexpectedly changed unrelated parameter") 1905 1906 iface.RemoveAllNetworks() 1907 res = if_obj.Get(WPAS_DBUS_IFACE, "Networks", 1908 dbus_interface=dbus.PROPERTIES_IFACE) 1909 if len(res) != 0: 1910 raise Exception("Unexpected number of networks") 1911 iface.RemoveAllNetworks() 1912 1913 tests = [dbus.Dictionary({'psk': "1234567"}, signature='sv'), 1914 dbus.Dictionary({'identity': dbus.ByteArray()}, 1915 signature='sv'), 1916 dbus.Dictionary({'identity': dbus.Byte(1)}, signature='sv')] 1917 for args in tests: 1918 try: 1919 iface.AddNetwork(args) 1920 raise Exception("Invalid AddNetwork args accepted: " + str(args)) 1921 except dbus.exceptions.DBusException as e: 1922 if "InvalidArgs" not in str(e): 1923 raise Exception("Unexpected error message for invalid AddNetwork: " + str(e)) 1924 1925def test_dbus_network_oom(dev, apdev): 1926 """D-Bus AddNetwork/RemoveNetwork parameters and OOM error cases""" 1927 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 1928 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 1929 1930 args = dbus.Dictionary({'ssid': "foo1", 'key_mgmt': 'NONE', 1931 'identity': "testuser", 'scan_freq': '2412'}, 1932 signature='sv') 1933 netw1 = iface.AddNetwork(args) 1934 net_obj = bus.get_object(WPAS_DBUS_SERVICE, netw1) 1935 1936 with alloc_fail_dbus(dev[0], 1, 1937 "wpa_config_get_all;wpas_dbus_getter_network_properties", 1938 "Get"): 1939 net_obj.Get(WPAS_DBUS_NETWORK, "Properties", 1940 dbus_interface=dbus.PROPERTIES_IFACE) 1941 1942 iface.RemoveAllNetworks() 1943 1944 with alloc_fail_dbus(dev[0], 1, 1945 "wpas_dbus_new_decompose_object_path;wpas_dbus_handler_remove_network", 1946 "RemoveNetwork", "InvalidArgs"): 1947 iface.RemoveNetwork(dbus.ObjectPath("/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234")) 1948 1949 with alloc_fail(dev[0], 1, "wpa_dbus_register_object_per_iface;wpas_dbus_register_network"): 1950 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1951 signature='sv') 1952 try: 1953 netw = iface.AddNetwork(args) 1954 # Currently, AddNetwork() succeeds even if os_strdup() for path 1955 # fails, so remove the network if that occurs. 1956 iface.RemoveNetwork(netw) 1957 except dbus.exceptions.DBusException as e: 1958 pass 1959 1960 for i in range(1, 3): 1961 with alloc_fail(dev[0], i, "=wpas_dbus_register_network"): 1962 try: 1963 netw = iface.AddNetwork(args) 1964 # Currently, AddNetwork() succeeds even if network registration 1965 # fails, so remove the network if that occurs. 1966 iface.RemoveNetwork(netw) 1967 except dbus.exceptions.DBusException as e: 1968 pass 1969 1970 with alloc_fail_dbus(dev[0], 1, 1971 "=wpa_config_add_network;wpas_dbus_handler_add_network", 1972 "AddNetwork", 1973 "UnknownError: wpa_supplicant could not add a network"): 1974 args = dbus.Dictionary({'ssid': "foo2", 'key_mgmt': 'NONE'}, 1975 signature='sv') 1976 netw = iface.AddNetwork(args) 1977 1978 tests = [(1, 1979 'wpa_dbus_dict_get_entry;set_network_properties;wpas_dbus_handler_add_network', 1980 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 1981 signature='sv')), 1982 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1983 dbus.Dictionary({'ssid': 'foo'}, signature='sv')), 1984 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1985 dbus.Dictionary({'eap': 'foo'}, signature='sv')), 1986 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1987 dbus.Dictionary({'priority': dbus.UInt32(1)}, 1988 signature='sv')), 1989 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1990 dbus.Dictionary({'priority': dbus.Int32(1)}, 1991 signature='sv')), 1992 (1, '=set_network_properties;wpas_dbus_handler_add_network', 1993 dbus.Dictionary({'ssid': dbus.ByteArray(b' ')}, 1994 signature='sv'))] 1995 for (count, funcs, args) in tests: 1996 with alloc_fail_dbus(dev[0], count, funcs, "AddNetwork", "InvalidArgs"): 1997 netw = iface.AddNetwork(args) 1998 1999 if len(if_obj.Get(WPAS_DBUS_IFACE, 'Networks', 2000 dbus_interface=dbus.PROPERTIES_IFACE)) > 0: 2001 raise Exception("Unexpected network block added") 2002 if len(dev[0].list_networks()) > 0: 2003 raise Exception("Unexpected network block visible") 2004 2005def test_dbus_interface(dev, apdev): 2006 """D-Bus CreateInterface/GetInterface/RemoveInterface parameters and error cases""" 2007 try: 2008 _test_dbus_interface(dev, apdev) 2009 finally: 2010 # Need to force P2P channel list update since the 'lo' interface 2011 # with driver=none ends up configuring default dualband channels. 2012 dev[0].request("SET country US") 2013 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2014 if ev is None: 2015 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2016 timeout=1) 2017 dev[0].request("SET country 00") 2018 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2019 if ev is None: 2020 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], 2021 timeout=1) 2022 subprocess.call(['iw', 'reg', 'set', '00']) 2023 2024def _test_dbus_interface(dev, apdev): 2025 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2026 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2027 2028 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2029 signature='sv') 2030 path = wpas.CreateInterface(params) 2031 logger.debug("New interface path: " + str(path)) 2032 path2 = wpas.GetInterface("lo") 2033 if path != path2: 2034 raise Exception("Interface object mismatch") 2035 2036 params = dbus.Dictionary({'Ifname': 'lo', 2037 'Driver': 'none', 2038 'ConfigFile': 'foo', 2039 'BridgeIfname': 'foo',}, 2040 signature='sv') 2041 try: 2042 wpas.CreateInterface(params) 2043 raise Exception("Invalid CreateInterface() accepted") 2044 except dbus.exceptions.DBusException as e: 2045 if "InterfaceExists" not in str(e): 2046 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2047 2048 wpas.RemoveInterface(path) 2049 try: 2050 wpas.RemoveInterface(path) 2051 raise Exception("Invalid RemoveInterface() accepted") 2052 except dbus.exceptions.DBusException as e: 2053 if "InterfaceUnknown" not in str(e): 2054 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2055 2056 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none', 2057 'Foo': 123}, 2058 signature='sv') 2059 try: 2060 wpas.CreateInterface(params) 2061 raise Exception("Invalid CreateInterface() accepted") 2062 except dbus.exceptions.DBusException as e: 2063 if "InvalidArgs" not in str(e): 2064 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2065 2066 params = dbus.Dictionary({'Driver': 'none'}, signature='sv') 2067 try: 2068 wpas.CreateInterface(params) 2069 raise Exception("Invalid CreateInterface() accepted") 2070 except dbus.exceptions.DBusException as e: 2071 if "InvalidArgs" not in str(e): 2072 raise Exception("Unexpected error message for invalid CreateInterface: " + str(e)) 2073 2074 try: 2075 wpas.GetInterface("lo") 2076 raise Exception("Invalid GetInterface() accepted") 2077 except dbus.exceptions.DBusException as e: 2078 if "InterfaceUnknown" not in str(e): 2079 raise Exception("Unexpected error message for invalid RemoveInterface: " + str(e)) 2080 2081def test_dbus_interface_oom(dev, apdev): 2082 """D-Bus CreateInterface/GetInterface/RemoveInterface OOM error cases""" 2083 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2084 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 2085 2086 with alloc_fail_dbus(dev[0], 1, "wpa_dbus_dict_get_entry;wpas_dbus_handler_create_interface", "CreateInterface", "InvalidArgs"): 2087 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2088 signature='sv') 2089 wpas.CreateInterface(params) 2090 2091 for i in range(1, 1000): 2092 dev[0].request("TEST_ALLOC_FAIL %d:wpa_supplicant_add_iface;wpas_dbus_handler_create_interface" % i) 2093 params = dbus.Dictionary({'Ifname': 'lo', 'Driver': 'none'}, 2094 signature='sv') 2095 try: 2096 npath = wpas.CreateInterface(params) 2097 wpas.RemoveInterface(npath) 2098 logger.info("CreateInterface succeeds after %d allocation failures" % i) 2099 state = dev[0].request('GET_ALLOC_FAIL') 2100 logger.info("GET_ALLOC_FAIL: " + state) 2101 dev[0].dump_monitor() 2102 dev[0].request("TEST_ALLOC_FAIL 0:") 2103 if i < 5: 2104 raise Exception("CreateInterface succeeded during out-of-memory") 2105 if not state.startswith('0:'): 2106 break 2107 except dbus.exceptions.DBusException as e: 2108 pass 2109 2110 for arg in ['Driver', 'Ifname', 'ConfigFile', 'BridgeIfname']: 2111 with alloc_fail_dbus(dev[0], 1, "=wpas_dbus_handler_create_interface", 2112 "CreateInterface"): 2113 params = dbus.Dictionary({arg: 'foo'}, signature='sv') 2114 wpas.CreateInterface(params) 2115 2116def test_dbus_blob(dev, apdev): 2117 """D-Bus AddNetwork/RemoveNetwork parameters and error cases""" 2118 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2119 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2120 2121 blob = dbus.ByteArray(b"\x01\x02\x03") 2122 iface.AddBlob('blob1', blob) 2123 try: 2124 iface.AddBlob('blob1', dbus.ByteArray(b"\x01\x02\x04")) 2125 raise Exception("Invalid AddBlob() accepted") 2126 except dbus.exceptions.DBusException as e: 2127 if "BlobExists" not in str(e): 2128 raise Exception("Unexpected error message for invalid AddBlob: " + str(e)) 2129 res = iface.GetBlob('blob1') 2130 if len(res) != len(blob): 2131 raise Exception("Unexpected blob data length") 2132 for i in range(len(res)): 2133 if res[i] != dbus.Byte(blob[i]): 2134 raise Exception("Unexpected blob data") 2135 res = if_obj.Get(WPAS_DBUS_IFACE, "Blobs", 2136 dbus_interface=dbus.PROPERTIES_IFACE) 2137 if 'blob1' not in res: 2138 raise Exception("Added blob missing from Blobs property") 2139 iface.RemoveBlob('blob1') 2140 try: 2141 iface.RemoveBlob('blob1') 2142 raise Exception("Invalid RemoveBlob() accepted") 2143 except dbus.exceptions.DBusException as e: 2144 if "BlobUnknown" not in str(e): 2145 raise Exception("Unexpected error message for invalid RemoveBlob: " + str(e)) 2146 try: 2147 iface.GetBlob('blob1') 2148 raise Exception("Invalid GetBlob() accepted") 2149 except dbus.exceptions.DBusException as e: 2150 if "BlobUnknown" not in str(e): 2151 raise Exception("Unexpected error message for invalid GetBlob: " + str(e)) 2152 2153 class TestDbusBlob(TestDbus): 2154 def __init__(self, bus): 2155 TestDbus.__init__(self, bus) 2156 self.blob_added = False 2157 self.blob_removed = False 2158 2159 def __enter__(self): 2160 gobject.timeout_add(1, self.run_blob) 2161 gobject.timeout_add(15000, self.timeout) 2162 self.add_signal(self.blobAdded, WPAS_DBUS_IFACE, "BlobAdded") 2163 self.add_signal(self.blobRemoved, WPAS_DBUS_IFACE, "BlobRemoved") 2164 self.loop.run() 2165 return self 2166 2167 def blobAdded(self, blobName): 2168 logger.debug("blobAdded: %s" % blobName) 2169 if blobName == 'blob2': 2170 self.blob_added = True 2171 2172 def blobRemoved(self, blobName): 2173 logger.debug("blobRemoved: %s" % blobName) 2174 if blobName == 'blob2': 2175 self.blob_removed = True 2176 self.loop.quit() 2177 2178 def run_blob(self, *args): 2179 logger.debug("run_blob") 2180 iface.AddBlob('blob2', dbus.ByteArray(b"\x01\x02\x04")) 2181 iface.RemoveBlob('blob2') 2182 return False 2183 2184 def success(self): 2185 return self.blob_added and self.blob_removed 2186 2187 with TestDbusBlob(bus) as t: 2188 if not t.success(): 2189 raise Exception("Expected signals not seen") 2190 2191def test_dbus_blob_oom(dev, apdev): 2192 """D-Bus AddNetwork/RemoveNetwork OOM error cases""" 2193 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2194 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2195 2196 for i in range(1, 4): 2197 with alloc_fail_dbus(dev[0], i, "wpas_dbus_handler_add_blob", 2198 "AddBlob"): 2199 iface.AddBlob('blob_no_mem', dbus.ByteArray(b"\x01\x02\x03\x04")) 2200 2201def test_dbus_autoscan(dev, apdev): 2202 """D-Bus Autoscan()""" 2203 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2204 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2205 2206 iface.AutoScan("foo") 2207 iface.AutoScan("periodic:1") 2208 iface.AutoScan("") 2209 dev[0].request("AUTOSCAN ") 2210 2211def test_dbus_autoscan_oom(dev, apdev): 2212 """D-Bus Autoscan() OOM""" 2213 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2214 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2215 2216 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_autoscan", "AutoScan"): 2217 iface.AutoScan("foo") 2218 dev[0].request("AUTOSCAN ") 2219 2220def test_dbus_tdls_invalid(dev, apdev): 2221 """D-Bus invalid TDLS operations""" 2222 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2223 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2224 2225 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2226 connect_2sta_open(dev, hapd) 2227 addr1 = dev[1].p2p_interface_addr() 2228 2229 try: 2230 iface.TDLSDiscover("foo") 2231 raise Exception("Invalid TDLSDiscover() accepted") 2232 except dbus.exceptions.DBusException as e: 2233 if "InvalidArgs" not in str(e): 2234 raise Exception("Unexpected error message for invalid TDLSDiscover: " + str(e)) 2235 2236 try: 2237 iface.TDLSStatus("foo") 2238 raise Exception("Invalid TDLSStatus() accepted") 2239 except dbus.exceptions.DBusException as e: 2240 if "InvalidArgs" not in str(e): 2241 raise Exception("Unexpected error message for invalid TDLSStatus: " + str(e)) 2242 2243 res = iface.TDLSStatus(addr1) 2244 if res != "peer does not exist": 2245 raise Exception("Unexpected TDLSStatus response") 2246 2247 try: 2248 iface.TDLSSetup("foo") 2249 raise Exception("Invalid TDLSSetup() accepted") 2250 except dbus.exceptions.DBusException as e: 2251 if "InvalidArgs" not in str(e): 2252 raise Exception("Unexpected error message for invalid TDLSSetup: " + str(e)) 2253 2254 try: 2255 iface.TDLSTeardown("foo") 2256 raise Exception("Invalid TDLSTeardown() accepted") 2257 except dbus.exceptions.DBusException as e: 2258 if "InvalidArgs" not in str(e): 2259 raise Exception("Unexpected error message for invalid TDLSTeardown: " + str(e)) 2260 2261 try: 2262 iface.TDLSTeardown("00:11:22:33:44:55") 2263 raise Exception("TDLSTeardown accepted for unknown peer") 2264 except dbus.exceptions.DBusException as e: 2265 if "UnknownError: error performing TDLS teardown" not in str(e): 2266 raise Exception("Unexpected error message: " + str(e)) 2267 2268 try: 2269 iface.TDLSChannelSwitch({}) 2270 raise Exception("Invalid TDLSChannelSwitch() accepted") 2271 except dbus.exceptions.DBusException as e: 2272 if "InvalidArgs" not in str(e): 2273 raise Exception("Unexpected error message for invalid TDLSChannelSwitch: " + str(e)) 2274 2275 try: 2276 iface.TDLSCancelChannelSwitch("foo") 2277 raise Exception("Invalid TDLSCancelChannelSwitch() accepted") 2278 except dbus.exceptions.DBusException as e: 2279 if "InvalidArgs" not in str(e): 2280 raise Exception("Unexpected error message for invalid TDLSCancelChannelSwitch: " + str(e)) 2281 2282def test_dbus_tdls_oom(dev, apdev): 2283 """D-Bus TDLS operations during OOM""" 2284 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2285 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2286 2287 with alloc_fail_dbus(dev[0], 1, "wpa_tdls_add_peer", "TDLSSetup", 2288 "UnknownError: error performing TDLS setup"): 2289 iface.TDLSSetup("00:11:22:33:44:55") 2290 2291def test_dbus_tdls(dev, apdev): 2292 """D-Bus TDLS""" 2293 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2294 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2295 2296 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2297 connect_2sta_open(dev, hapd) 2298 2299 addr1 = dev[1].p2p_interface_addr() 2300 2301 class TestDbusTdls(TestDbus): 2302 def __init__(self, bus): 2303 TestDbus.__init__(self, bus) 2304 self.tdls_setup = False 2305 self.tdls_teardown = False 2306 2307 def __enter__(self): 2308 gobject.timeout_add(1, self.run_tdls) 2309 gobject.timeout_add(15000, self.timeout) 2310 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2311 "PropertiesChanged") 2312 self.loop.run() 2313 return self 2314 2315 def propertiesChanged(self, properties): 2316 logger.debug("propertiesChanged: %s" % str(properties)) 2317 2318 def run_tdls(self, *args): 2319 logger.debug("run_tdls") 2320 iface.TDLSDiscover(addr1) 2321 gobject.timeout_add(100, self.run_tdls2) 2322 return False 2323 2324 def run_tdls2(self, *args): 2325 logger.debug("run_tdls2") 2326 iface.TDLSSetup(addr1) 2327 gobject.timeout_add(500, self.run_tdls3) 2328 return False 2329 2330 def run_tdls3(self, *args): 2331 logger.debug("run_tdls3") 2332 res = iface.TDLSStatus(addr1) 2333 if res == "connected": 2334 self.tdls_setup = True 2335 else: 2336 logger.info("Unexpected TDLSStatus: " + res) 2337 iface.TDLSTeardown(addr1) 2338 gobject.timeout_add(200, self.run_tdls4) 2339 return False 2340 2341 def run_tdls4(self, *args): 2342 logger.debug("run_tdls4") 2343 res = iface.TDLSStatus(addr1) 2344 if res == "peer does not exist": 2345 self.tdls_teardown = True 2346 else: 2347 logger.info("Unexpected TDLSStatus: " + res) 2348 self.loop.quit() 2349 return False 2350 2351 def success(self): 2352 return self.tdls_setup and self.tdls_teardown 2353 2354 with TestDbusTdls(bus) as t: 2355 if not t.success(): 2356 raise Exception("Expected signals not seen") 2357 2358def test_dbus_tdls_channel_switch(dev, apdev): 2359 """D-Bus TDLS channel switch configuration""" 2360 flags = int(dev[0].get_driver_status_field('capa.flags'), 16) 2361 if flags & 0x800000000 == 0: 2362 raise HwsimSkip("Driver does not support TDLS channel switching") 2363 2364 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2365 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2366 2367 hapd = hostapd.add_ap(apdev[0], {"ssid": "test-open"}) 2368 connect_2sta_open(dev, hapd) 2369 2370 addr1 = dev[1].p2p_interface_addr() 2371 2372 class TestDbusTdls(TestDbus): 2373 def __init__(self, bus): 2374 TestDbus.__init__(self, bus) 2375 self.tdls_setup = False 2376 self.tdls_done = False 2377 2378 def __enter__(self): 2379 gobject.timeout_add(1, self.run_tdls) 2380 gobject.timeout_add(15000, self.timeout) 2381 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 2382 "PropertiesChanged") 2383 self.loop.run() 2384 return self 2385 2386 def propertiesChanged(self, properties): 2387 logger.debug("propertiesChanged: %s" % str(properties)) 2388 2389 def run_tdls(self, *args): 2390 logger.debug("run_tdls") 2391 iface.TDLSDiscover(addr1) 2392 gobject.timeout_add(100, self.run_tdls2) 2393 return False 2394 2395 def run_tdls2(self, *args): 2396 logger.debug("run_tdls2") 2397 iface.TDLSSetup(addr1) 2398 gobject.timeout_add(500, self.run_tdls3) 2399 return False 2400 2401 def run_tdls3(self, *args): 2402 logger.debug("run_tdls3") 2403 res = iface.TDLSStatus(addr1) 2404 if res == "connected": 2405 self.tdls_setup = True 2406 else: 2407 logger.info("Unexpected TDLSStatus: " + res) 2408 2409 # Unknown dict entry 2410 args = dbus.Dictionary({'Foobar': dbus.Byte(1)}, 2411 signature='sv') 2412 try: 2413 iface.TDLSChannelSwitch(args) 2414 except Exception as e: 2415 if "InvalidArgs" not in str(e): 2416 raise Exception("Unexpected exception") 2417 2418 # Missing OperClass 2419 args = dbus.Dictionary({}, signature='sv') 2420 try: 2421 iface.TDLSChannelSwitch(args) 2422 except Exception as e: 2423 if "InvalidArgs" not in str(e): 2424 raise Exception("Unexpected exception") 2425 2426 # Missing Frequency 2427 args = dbus.Dictionary({'OperClass': dbus.Byte(1)}, 2428 signature='sv') 2429 try: 2430 iface.TDLSChannelSwitch(args) 2431 except Exception as e: 2432 if "InvalidArgs" not in str(e): 2433 raise Exception("Unexpected exception") 2434 2435 # Missing PeerAddress 2436 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2437 'Frequency': dbus.UInt32(2417)}, 2438 signature='sv') 2439 try: 2440 iface.TDLSChannelSwitch(args) 2441 except Exception as e: 2442 if "InvalidArgs" not in str(e): 2443 raise Exception("Unexpected exception") 2444 2445 # Valid parameters 2446 args = dbus.Dictionary({'OperClass': dbus.Byte(1), 2447 'Frequency': dbus.UInt32(2417), 2448 'PeerAddress': addr1, 2449 'SecChannelOffset': dbus.UInt32(0), 2450 'CenterFrequency1': dbus.UInt32(0), 2451 'CenterFrequency2': dbus.UInt32(0), 2452 'Bandwidth': dbus.UInt32(20), 2453 'HT': dbus.Boolean(False), 2454 'VHT': dbus.Boolean(False)}, 2455 signature='sv') 2456 iface.TDLSChannelSwitch(args) 2457 2458 gobject.timeout_add(200, self.run_tdls4) 2459 return False 2460 2461 def run_tdls4(self, *args): 2462 logger.debug("run_tdls4") 2463 iface.TDLSCancelChannelSwitch(addr1) 2464 self.tdls_done = True 2465 self.loop.quit() 2466 return False 2467 2468 def success(self): 2469 return self.tdls_setup and self.tdls_done 2470 2471 with TestDbusTdls(bus) as t: 2472 if not t.success(): 2473 raise Exception("Expected signals not seen") 2474 2475def test_dbus_pkcs11(dev, apdev): 2476 """D-Bus SetPKCS11EngineAndModulePath()""" 2477 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2478 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2479 2480 try: 2481 iface.SetPKCS11EngineAndModulePath("foo", "bar") 2482 except dbus.exceptions.DBusException as e: 2483 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2484 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2485 2486 try: 2487 iface.SetPKCS11EngineAndModulePath("foo", "") 2488 except dbus.exceptions.DBusException as e: 2489 if "Error.Failed: Reinit of the EAPOL" not in str(e): 2490 raise Exception("Unexpected error message for invalid SetPKCS11EngineAndModulePath: " + str(e)) 2491 2492 iface.SetPKCS11EngineAndModulePath("", "bar") 2493 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2494 dbus_interface=dbus.PROPERTIES_IFACE) 2495 if res != "": 2496 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2497 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2498 dbus_interface=dbus.PROPERTIES_IFACE) 2499 if res != "bar": 2500 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2501 2502 iface.SetPKCS11EngineAndModulePath("", "") 2503 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11EnginePath", 2504 dbus_interface=dbus.PROPERTIES_IFACE) 2505 if res != "": 2506 raise Exception("Unexpected PKCS11EnginePath value: " + res) 2507 res = if_obj.Get(WPAS_DBUS_IFACE, "PKCS11ModulePath", 2508 dbus_interface=dbus.PROPERTIES_IFACE) 2509 if res != "": 2510 raise Exception("Unexpected PKCS11ModulePath value: " + res) 2511 2512def test_dbus_apscan(dev, apdev): 2513 """D-Bus Get/Set ApScan""" 2514 try: 2515 _test_dbus_apscan(dev, apdev) 2516 finally: 2517 dev[0].request("AP_SCAN 1") 2518 2519def _test_dbus_apscan(dev, apdev): 2520 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2521 2522 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2523 dbus_interface=dbus.PROPERTIES_IFACE) 2524 if res != 1: 2525 raise Exception("Unexpected initial ApScan value: %d" % res) 2526 2527 for i in range(3): 2528 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(i), 2529 dbus_interface=dbus.PROPERTIES_IFACE) 2530 res = if_obj.Get(WPAS_DBUS_IFACE, "ApScan", 2531 dbus_interface=dbus.PROPERTIES_IFACE) 2532 if res != i: 2533 raise Exception("Unexpected ApScan value %d (expected %d)" % (res, i)) 2534 2535 try: 2536 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.Int16(-1), 2537 dbus_interface=dbus.PROPERTIES_IFACE) 2538 raise Exception("Invalid Set(ApScan,-1) accepted") 2539 except dbus.exceptions.DBusException as e: 2540 if "Error.Failed: wrong property type" not in str(e): 2541 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2542 2543 try: 2544 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(123), 2545 dbus_interface=dbus.PROPERTIES_IFACE) 2546 raise Exception("Invalid Set(ApScan,123) accepted") 2547 except dbus.exceptions.DBusException as e: 2548 if "Error.Failed: ap_scan must be 0, 1, or 2" not in str(e): 2549 raise Exception("Unexpected error message for invalid Set(ApScan,123): " + str(e)) 2550 2551 if_obj.Set(WPAS_DBUS_IFACE, "ApScan", dbus.UInt32(1), 2552 dbus_interface=dbus.PROPERTIES_IFACE) 2553 2554def test_dbus_pmf(dev, apdev): 2555 """D-Bus Get/Set Pmf""" 2556 try: 2557 _test_dbus_pmf(dev, apdev) 2558 finally: 2559 dev[0].request("SET pmf 0") 2560 2561def _test_dbus_pmf(dev, apdev): 2562 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2563 2564 dev[0].set("pmf", "0") 2565 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2566 dbus_interface=dbus.PROPERTIES_IFACE) 2567 if res != "0": 2568 raise Exception("Unexpected initial Pmf value: %s" % res) 2569 2570 for i in range(3): 2571 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", str(i), 2572 dbus_interface=dbus.PROPERTIES_IFACE) 2573 res = if_obj.Get(WPAS_DBUS_IFACE, "Pmf", 2574 dbus_interface=dbus.PROPERTIES_IFACE) 2575 if res != str(i): 2576 raise Exception("Unexpected Pmf value %s (expected %d)" % (res, i)) 2577 2578 if_obj.Set(WPAS_DBUS_IFACE, "Pmf", "1", 2579 dbus_interface=dbus.PROPERTIES_IFACE) 2580 2581def test_dbus_fastreauth(dev, apdev): 2582 """D-Bus Get/Set FastReauth""" 2583 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2584 2585 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2586 dbus_interface=dbus.PROPERTIES_IFACE) 2587 if res != True: 2588 raise Exception("Unexpected initial FastReauth value: " + str(res)) 2589 2590 for i in [False, True]: 2591 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(i), 2592 dbus_interface=dbus.PROPERTIES_IFACE) 2593 res = if_obj.Get(WPAS_DBUS_IFACE, "FastReauth", 2594 dbus_interface=dbus.PROPERTIES_IFACE) 2595 if res != i: 2596 raise Exception("Unexpected FastReauth value %d (expected %d)" % (res, i)) 2597 2598 try: 2599 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Int16(-1), 2600 dbus_interface=dbus.PROPERTIES_IFACE) 2601 raise Exception("Invalid Set(FastReauth,-1) accepted") 2602 except dbus.exceptions.DBusException as e: 2603 if "Error.Failed: wrong property type" not in str(e): 2604 raise Exception("Unexpected error message for invalid Set(ApScan,-1): " + str(e)) 2605 2606 if_obj.Set(WPAS_DBUS_IFACE, "FastReauth", dbus.Boolean(True), 2607 dbus_interface=dbus.PROPERTIES_IFACE) 2608 2609def test_dbus_bss_expire(dev, apdev): 2610 """D-Bus Get/Set BSSExpireAge and BSSExpireCount""" 2611 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2612 2613 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(179), 2614 dbus_interface=dbus.PROPERTIES_IFACE) 2615 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireAge", 2616 dbus_interface=dbus.PROPERTIES_IFACE) 2617 if res != 179: 2618 raise Exception("Unexpected BSSExpireAge value %d (expected %d)" % (res, i)) 2619 2620 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(3), 2621 dbus_interface=dbus.PROPERTIES_IFACE) 2622 res = if_obj.Get(WPAS_DBUS_IFACE, "BSSExpireCount", 2623 dbus_interface=dbus.PROPERTIES_IFACE) 2624 if res != 3: 2625 raise Exception("Unexpected BSSExpireCount value %d (expected %d)" % (res, i)) 2626 2627 try: 2628 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.Int16(-1), 2629 dbus_interface=dbus.PROPERTIES_IFACE) 2630 raise Exception("Invalid Set(BSSExpireAge,-1) accepted") 2631 except dbus.exceptions.DBusException as e: 2632 if "Error.Failed: wrong property type" not in str(e): 2633 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,-1): " + str(e)) 2634 2635 try: 2636 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(9), 2637 dbus_interface=dbus.PROPERTIES_IFACE) 2638 raise Exception("Invalid Set(BSSExpireAge,9) accepted") 2639 except dbus.exceptions.DBusException as e: 2640 if "Error.Failed: BSSExpireAge must be >= 10" not in str(e): 2641 raise Exception("Unexpected error message for invalid Set(BSSExpireAge,9): " + str(e)) 2642 2643 try: 2644 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.Int16(-1), 2645 dbus_interface=dbus.PROPERTIES_IFACE) 2646 raise Exception("Invalid Set(BSSExpireCount,-1) accepted") 2647 except dbus.exceptions.DBusException as e: 2648 if "Error.Failed: wrong property type" not in str(e): 2649 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,-1): " + str(e)) 2650 2651 try: 2652 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(0), 2653 dbus_interface=dbus.PROPERTIES_IFACE) 2654 raise Exception("Invalid Set(BSSExpireCount,0) accepted") 2655 except dbus.exceptions.DBusException as e: 2656 if "Error.Failed: BSSExpireCount must be > 0" not in str(e): 2657 raise Exception("Unexpected error message for invalid Set(BSSExpireCount,0): " + str(e)) 2658 2659 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireAge", dbus.UInt32(180), 2660 dbus_interface=dbus.PROPERTIES_IFACE) 2661 if_obj.Set(WPAS_DBUS_IFACE, "BSSExpireCount", dbus.UInt32(2), 2662 dbus_interface=dbus.PROPERTIES_IFACE) 2663 2664def test_dbus_country(dev, apdev): 2665 """D-Bus Get/Set Country""" 2666 try: 2667 _test_dbus_country(dev, apdev) 2668 finally: 2669 dev[0].request("SET country 00") 2670 subprocess.call(['iw', 'reg', 'set', '00']) 2671 2672def _test_dbus_country(dev, apdev): 2673 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2674 2675 # work around issues with possible pending regdom event from the end of 2676 # the previous test case 2677 time.sleep(0.2) 2678 dev[0].dump_monitor() 2679 2680 if_obj.Set(WPAS_DBUS_IFACE, "Country", "FI", 2681 dbus_interface=dbus.PROPERTIES_IFACE) 2682 res = if_obj.Get(WPAS_DBUS_IFACE, "Country", 2683 dbus_interface=dbus.PROPERTIES_IFACE) 2684 if res != "FI": 2685 raise Exception("Unexpected Country value %s (expected FI)" % res) 2686 2687 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2688 if ev is None: 2689 # For now, work around separate P2P Device interface event delivery 2690 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2691 if ev is None: 2692 raise Exception("regdom change event not seen") 2693 if "init=USER type=COUNTRY alpha2=FI" not in ev: 2694 raise Exception("Unexpected event contents: " + ev) 2695 2696 try: 2697 if_obj.Set(WPAS_DBUS_IFACE, "Country", dbus.Int16(-1), 2698 dbus_interface=dbus.PROPERTIES_IFACE) 2699 raise Exception("Invalid Set(Country,-1) accepted") 2700 except dbus.exceptions.DBusException as e: 2701 if "Error.Failed: wrong property type" not in str(e): 2702 raise Exception("Unexpected error message for invalid Set(Country,-1): " + str(e)) 2703 2704 try: 2705 if_obj.Set(WPAS_DBUS_IFACE, "Country", "F", 2706 dbus_interface=dbus.PROPERTIES_IFACE) 2707 raise Exception("Invalid Set(Country,F) accepted") 2708 except dbus.exceptions.DBusException as e: 2709 if "Error.Failed: invalid country code" not in str(e): 2710 raise Exception("Unexpected error message for invalid Set(Country,F): " + str(e)) 2711 2712 if_obj.Set(WPAS_DBUS_IFACE, "Country", "00", 2713 dbus_interface=dbus.PROPERTIES_IFACE) 2714 2715 ev = dev[0].wait_event(["CTRL-EVENT-REGDOM-CHANGE"]) 2716 if ev is None: 2717 # For now, work around separate P2P Device interface event delivery 2718 ev = dev[0].wait_global_event(["CTRL-EVENT-REGDOM-CHANGE"], timeout=1) 2719 if ev is None: 2720 raise Exception("regdom change event not seen") 2721 # init=CORE was previously used due to invalid db.txt data for 00. For 2722 # now, allow both it and the new init=USER after fixed db.txt. 2723 if "init=CORE type=WORLD" not in ev and "init=USER type=WORLD" not in ev: 2724 raise Exception("Unexpected event contents: " + ev) 2725 2726def test_dbus_scan_interval(dev, apdev): 2727 """D-Bus Get/Set ScanInterval""" 2728 try: 2729 _test_dbus_scan_interval(dev, apdev) 2730 finally: 2731 dev[0].request("SCAN_INTERVAL 5") 2732 2733def _test_dbus_scan_interval(dev, apdev): 2734 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2735 2736 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(3), 2737 dbus_interface=dbus.PROPERTIES_IFACE) 2738 res = if_obj.Get(WPAS_DBUS_IFACE, "ScanInterval", 2739 dbus_interface=dbus.PROPERTIES_IFACE) 2740 if res != 3: 2741 raise Exception("Unexpected ScanInterval value %d (expected %d)" % (res, i)) 2742 2743 try: 2744 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.UInt16(100), 2745 dbus_interface=dbus.PROPERTIES_IFACE) 2746 raise Exception("Invalid Set(ScanInterval,100) accepted") 2747 except dbus.exceptions.DBusException as e: 2748 if "Error.Failed: wrong property type" not in str(e): 2749 raise Exception("Unexpected error message for invalid Set(ScanInterval,100): " + str(e)) 2750 2751 try: 2752 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(-1), 2753 dbus_interface=dbus.PROPERTIES_IFACE) 2754 raise Exception("Invalid Set(ScanInterval,-1) accepted") 2755 except dbus.exceptions.DBusException as e: 2756 if "Error.Failed: scan_interval must be >= 0" not in str(e): 2757 raise Exception("Unexpected error message for invalid Set(ScanInterval,-1): " + str(e)) 2758 2759 if_obj.Set(WPAS_DBUS_IFACE, "ScanInterval", dbus.Int32(5), 2760 dbus_interface=dbus.PROPERTIES_IFACE) 2761 2762def test_dbus_probe_req_reporting(dev, apdev): 2763 """D-Bus Probe Request reporting""" 2764 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2765 2766 dev[1].p2p_find(social=True) 2767 2768 class TestDbusProbe(TestDbus): 2769 def __init__(self, bus): 2770 TestDbus.__init__(self, bus) 2771 self.reported = False 2772 2773 def __enter__(self): 2774 gobject.timeout_add(1, self.run_test) 2775 gobject.timeout_add(15000, self.timeout) 2776 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 2777 "GroupStarted") 2778 self.add_signal(self.probeRequest, WPAS_DBUS_IFACE, "ProbeRequest", 2779 byte_arrays=True) 2780 self.loop.run() 2781 return self 2782 2783 def groupStarted(self, properties): 2784 logger.debug("groupStarted: " + str(properties)) 2785 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 2786 properties['interface_object']) 2787 self.iface = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE) 2788 self.iface.SubscribeProbeReq() 2789 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2790 2791 def probeRequest(self, args): 2792 logger.debug("probeRequest: args=%s" % str(args)) 2793 self.reported = True 2794 self.loop.quit() 2795 2796 def run_test(self, *args): 2797 logger.debug("run_test") 2798 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2799 params = dbus.Dictionary({'frequency': 2412}) 2800 p2p.GroupAdd(params) 2801 return False 2802 2803 def success(self): 2804 return self.reported 2805 2806 with TestDbusProbe(bus) as t: 2807 if not t.success(): 2808 raise Exception("Expected signals not seen") 2809 t.iface.UnsubscribeProbeReq() 2810 try: 2811 t.iface.UnsubscribeProbeReq() 2812 raise Exception("Invalid UnsubscribeProbeReq() accepted") 2813 except dbus.exceptions.DBusException as e: 2814 if "NoSubscription" not in str(e): 2815 raise Exception("Unexpected error message for invalid UnsubscribeProbeReq(): " + str(e)) 2816 t.group_p2p.Disconnect() 2817 2818 with TestDbusProbe(bus) as t: 2819 if not t.success(): 2820 raise Exception("Expected signals not seen") 2821 # On purpose, leave ProbeReq subscription in place to test automatic 2822 # cleanup. 2823 2824 dev[1].p2p_stop_find() 2825 2826def test_dbus_probe_req_reporting_oom(dev, apdev): 2827 """D-Bus Probe Request reporting (OOM)""" 2828 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2829 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 2830 2831 # Need to make sure this process has not already subscribed to avoid false 2832 # failures due to the operation succeeding due to os_strdup() not even 2833 # getting called. 2834 try: 2835 iface.UnsubscribeProbeReq() 2836 was_subscribed = True 2837 except dbus.exceptions.DBusException as e: 2838 was_subscribed = False 2839 pass 2840 2841 with alloc_fail_dbus(dev[0], 1, "wpas_dbus_handler_subscribe_preq", 2842 "SubscribeProbeReq"): 2843 iface.SubscribeProbeReq() 2844 2845 if was_subscribed: 2846 # On purpose, leave ProbeReq subscription in place to test automatic 2847 # cleanup. 2848 iface.SubscribeProbeReq() 2849 2850def test_dbus_p2p_invalid(dev, apdev): 2851 """D-Bus invalid P2P operations""" 2852 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 2853 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2854 2855 try: 2856 p2p.RejectPeer(path + "/Peers/00112233445566") 2857 raise Exception("Invalid RejectPeer accepted") 2858 except dbus.exceptions.DBusException as e: 2859 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 2860 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2861 2862 try: 2863 p2p.RejectPeer("/foo") 2864 raise Exception("Invalid RejectPeer accepted") 2865 except dbus.exceptions.DBusException as e: 2866 if "InvalidArgs" not in str(e): 2867 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 2868 2869 tests = [{}, 2870 {'peer': 'foo'}, 2871 {'foo': "bar"}, 2872 {'iface': "abc"}, 2873 {'iface': 123}] 2874 for t in tests: 2875 try: 2876 p2p.RemoveClient(t) 2877 raise Exception("Invalid RemoveClient accepted") 2878 except dbus.exceptions.DBusException as e: 2879 if "InvalidArgs" not in str(e): 2880 raise Exception("Unexpected error message for invalid RemoveClient(): " + str(e)) 2881 2882 tests = [{'DiscoveryType': 'foo'}, 2883 {'RequestedDeviceTypes': 'foo'}, 2884 {'RequestedDeviceTypes': ['foo']}, 2885 {'RequestedDeviceTypes': ['1', '2', '3', '4', '5', '6', '7', '8', 2886 '9', '10', '11', '12', '13', '14', '15', 2887 '16', '17']}, 2888 {'RequestedDeviceTypes': dbus.Array([], signature="s")}, 2889 {'RequestedDeviceTypes': dbus.Array([['foo']], signature="as")}, 2890 {'RequestedDeviceTypes': dbus.Array([], signature="i")}, 2891 {'RequestedDeviceTypes': [dbus.ByteArray(b'12345678'), 2892 dbus.ByteArray(b'1234567')]}, 2893 {'Foo': dbus.Int16(1)}, 2894 {'Foo': dbus.UInt16(1)}, 2895 {'Foo': dbus.Int64(1)}, 2896 {'Foo': dbus.UInt64(1)}, 2897 {'Foo': dbus.Double(1.23)}, 2898 {'Foo': dbus.Signature('s')}, 2899 {'Foo': 'bar'}] 2900 for t in tests: 2901 try: 2902 p2p.Find(dbus.Dictionary(t)) 2903 raise Exception("Invalid Find accepted") 2904 except dbus.exceptions.DBusException as e: 2905 if "InvalidArgs" not in str(e): 2906 raise Exception("Unexpected error message for invalid Find(): " + str(e)) 2907 2908 for p in ["/foo", 2909 "/fi/w1/wpa_supplicant1/Interfaces/1234", 2910 "/fi/w1/wpa_supplicant1/Interfaces/1234/Networks/1234"]: 2911 try: 2912 p2p.RemovePersistentGroup(dbus.ObjectPath(p)) 2913 raise Exception("Invalid RemovePersistentGroup accepted") 2914 except dbus.exceptions.DBusException as e: 2915 if "InvalidArgs" not in str(e): 2916 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 2917 2918 try: 2919 dev[0].request("P2P_SET disabled 1") 2920 p2p.Listen(5) 2921 raise Exception("Invalid Listen accepted") 2922 except dbus.exceptions.DBusException as e: 2923 if "UnknownError: Could not start P2P listen" not in str(e): 2924 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2925 finally: 2926 dev[0].request("P2P_SET disabled 0") 2927 2928 test_obj = bus.get_object(WPAS_DBUS_SERVICE, path, introspect=False) 2929 test_p2p = dbus.Interface(test_obj, WPAS_DBUS_IFACE_P2PDEVICE) 2930 try: 2931 test_p2p.Listen("foo") 2932 raise Exception("Invalid Listen accepted") 2933 except dbus.exceptions.DBusException as e: 2934 if "InvalidArgs" not in str(e): 2935 raise Exception("Unexpected error message for invalid Listen: " + str(e)) 2936 2937 try: 2938 dev[0].request("P2P_SET disabled 1") 2939 p2p.ExtendedListen(dbus.Dictionary({})) 2940 raise Exception("Invalid ExtendedListen accepted") 2941 except dbus.exceptions.DBusException as e: 2942 if "UnknownError: failed to initiate a p2p_ext_listen" not in str(e): 2943 raise Exception("Unexpected error message for invalid ExtendedListen: " + str(e)) 2944 finally: 2945 dev[0].request("P2P_SET disabled 0") 2946 2947 try: 2948 dev[0].request("P2P_SET disabled 1") 2949 args = {'duration1': 30000, 'interval1': 102400, 2950 'duration2': 20000, 'interval2': 102400} 2951 p2p.PresenceRequest(args) 2952 raise Exception("Invalid PresenceRequest accepted") 2953 except dbus.exceptions.DBusException as e: 2954 if "UnknownError: Failed to invoke presence request" not in str(e): 2955 raise Exception("Unexpected error message for invalid PresenceRequest: " + str(e)) 2956 finally: 2957 dev[0].request("P2P_SET disabled 0") 2958 2959 try: 2960 params = dbus.Dictionary({'frequency': dbus.Int32(-1)}) 2961 p2p.GroupAdd(params) 2962 raise Exception("Invalid GroupAdd accepted") 2963 except dbus.exceptions.DBusException as e: 2964 if "InvalidArgs" not in str(e): 2965 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 2966 2967 try: 2968 params = dbus.Dictionary({'persistent_group_object': 2969 dbus.ObjectPath(path), 2970 'frequency': 2412}) 2971 p2p.GroupAdd(params) 2972 raise Exception("Invalid GroupAdd accepted") 2973 except dbus.exceptions.DBusException as e: 2974 if "InvalidArgs" not in str(e): 2975 raise Exception("Unexpected error message for invalid GroupAdd: " + str(e)) 2976 2977 try: 2978 p2p.Disconnect() 2979 raise Exception("Invalid Disconnect accepted") 2980 except dbus.exceptions.DBusException as e: 2981 if "UnknownError: failed to disconnect" not in str(e): 2982 raise Exception("Unexpected error message for invalid Disconnect: " + str(e)) 2983 2984 try: 2985 dev[0].request("P2P_SET disabled 1") 2986 p2p.Flush() 2987 raise Exception("Invalid Flush accepted") 2988 except dbus.exceptions.DBusException as e: 2989 if "Error.Failed: P2P is not available for this interface" not in str(e): 2990 raise Exception("Unexpected error message for invalid Flush: " + str(e)) 2991 finally: 2992 dev[0].request("P2P_SET disabled 0") 2993 2994 try: 2995 dev[0].request("P2P_SET disabled 1") 2996 args = {'peer': path, 2997 'join': True, 2998 'wps_method': 'pbc', 2999 'frequency': 2412} 3000 pin = p2p.Connect(args) 3001 raise Exception("Invalid Connect accepted") 3002 except dbus.exceptions.DBusException as e: 3003 if "Error.Failed: P2P is not available for this interface" not in str(e): 3004 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3005 finally: 3006 dev[0].request("P2P_SET disabled 0") 3007 3008 tests = [{'frequency': dbus.Int32(-1)}, 3009 {'wps_method': 'pbc'}, 3010 {'wps_method': 'foo'}] 3011 for args in tests: 3012 try: 3013 pin = p2p.Connect(args) 3014 raise Exception("Invalid Connect accepted") 3015 except dbus.exceptions.DBusException as e: 3016 if "InvalidArgs" not in str(e): 3017 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3018 3019 try: 3020 dev[0].request("P2P_SET disabled 1") 3021 args = {'peer': path} 3022 pin = p2p.Invite(args) 3023 raise Exception("Invalid Invite accepted") 3024 except dbus.exceptions.DBusException as e: 3025 if "Error.Failed: P2P is not available for this interface" not in str(e): 3026 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 3027 finally: 3028 dev[0].request("P2P_SET disabled 0") 3029 3030 try: 3031 args = {'foo': 'bar'} 3032 pin = p2p.Invite(args) 3033 raise Exception("Invalid Invite accepted") 3034 except dbus.exceptions.DBusException as e: 3035 if "InvalidArgs" not in str(e): 3036 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 3037 3038 tests = [(path, 'display', "InvalidArgs"), 3039 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3040 'display', 3041 "UnknownError: Failed to send provision discovery request"), 3042 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3043 'keypad', 3044 "UnknownError: Failed to send provision discovery request"), 3045 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3046 'pbc', 3047 "UnknownError: Failed to send provision discovery request"), 3048 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3049 'pushbutton', 3050 "UnknownError: Failed to send provision discovery request"), 3051 (dbus.ObjectPath(path + "/Peers/00112233445566"), 3052 'foo', "InvalidArgs")] 3053 for (p, method, err) in tests: 3054 try: 3055 p2p.ProvisionDiscoveryRequest(p, method) 3056 raise Exception("Invalid ProvisionDiscoveryRequest accepted") 3057 except dbus.exceptions.DBusException as e: 3058 if err not in str(e): 3059 raise Exception("Unexpected error message for invalid ProvisionDiscoveryRequest: " + str(e)) 3060 3061 try: 3062 dev[0].request("P2P_SET disabled 1") 3063 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3064 dbus_interface=dbus.PROPERTIES_IFACE) 3065 raise Exception("Invalid Get(Peers) accepted") 3066 except dbus.exceptions.DBusException as e: 3067 if "Error.Failed: P2P is not available for this interface" not in str(e): 3068 raise Exception("Unexpected error message for invalid Get(Peers): " + str(e)) 3069 finally: 3070 dev[0].request("P2P_SET disabled 0") 3071 3072def test_dbus_p2p_oom(dev, apdev): 3073 """D-Bus P2P operations and OOM""" 3074 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3075 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3076 3077 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_string_array", 3078 "Find", "InvalidArgs"): 3079 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3080 3081 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_string_array", 3082 "Find", "InvalidArgs"): 3083 p2p.Find(dbus.Dictionary({'Foo': ['bar']})) 3084 3085 with alloc_fail_dbus(dev[0], 10, "_wpa_dbus_dict_entry_get_string_array", 3086 "Find", "InvalidArgs"): 3087 p2p.Find(dbus.Dictionary({'Foo': ['1', '2', '3', '4', '5', '6', '7', 3088 '8', '9']})) 3089 3090 with alloc_fail_dbus(dev[0], 1, ":=_wpa_dbus_dict_entry_get_binarray", 3091 "Find", "InvalidArgs"): 3092 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3093 3094 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array;_wpa_dbus_dict_entry_get_binarray", 3095 "Find", "InvalidArgs"): 3096 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3097 3098 with alloc_fail_dbus(dev[0], 2, "=_wpa_dbus_dict_entry_get_binarray", 3099 "Find", "InvalidArgs"): 3100 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123'), 3101 dbus.ByteArray(b'123'), 3102 dbus.ByteArray(b'123'), 3103 dbus.ByteArray(b'123'), 3104 dbus.ByteArray(b'123'), 3105 dbus.ByteArray(b'123'), 3106 dbus.ByteArray(b'123'), 3107 dbus.ByteArray(b'123'), 3108 dbus.ByteArray(b'123'), 3109 dbus.ByteArray(b'123'), 3110 dbus.ByteArray(b'123')]})) 3111 3112 with alloc_fail_dbus(dev[0], 1, "wpabuf_alloc_ext_data;_wpa_dbus_dict_entry_get_binarray", 3113 "Find", "InvalidArgs"): 3114 p2p.Find(dbus.Dictionary({'Foo': [dbus.ByteArray(b'123')]})) 3115 3116 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_fill_value_from_variant;wpas_dbus_handler_p2p_find", 3117 "Find", "InvalidArgs"): 3118 p2p.Find(dbus.Dictionary({'Foo': path})) 3119 3120 with alloc_fail_dbus(dev[0], 1, "_wpa_dbus_dict_entry_get_byte_array", 3121 "AddService", "InvalidArgs"): 3122 args = {'service_type': 'bonjour', 3123 'response': dbus.ByteArray(500*b'b')} 3124 p2p.AddService(args) 3125 3126 with alloc_fail_dbus(dev[0], 2, "_wpa_dbus_dict_entry_get_byte_array", 3127 "AddService", "InvalidArgs"): 3128 p2p.AddService(args) 3129 3130def test_dbus_p2p_discovery(dev, apdev): 3131 """D-Bus P2P discovery""" 3132 try: 3133 run_dbus_p2p_discovery(dev, apdev) 3134 finally: 3135 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3136 3137def run_dbus_p2p_discovery(dev, apdev): 3138 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3139 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3140 3141 addr0 = dev[0].p2p_dev_addr() 3142 3143 dev[1].request("SET sec_device_type 1-0050F204-2") 3144 dev[1].request("VENDOR_ELEM_ADD 1 dd0c0050f2041049000411223344") 3145 dev[1].request("VENDOR_ELEM_ADD 1 dd06001122335566") 3146 dev[1].p2p_listen() 3147 addr1 = dev[1].p2p_dev_addr() 3148 a1 = binascii.unhexlify(addr1.replace(':', '')) 3149 3150 wfd_devinfo = "00001c440028" 3151 dev[2].request("SET wifi_display 1") 3152 dev[2].request("WFD_SUBELEM_SET 0 0006" + wfd_devinfo) 3153 wfd = binascii.unhexlify('000006' + wfd_devinfo) 3154 dev[2].p2p_listen() 3155 addr2 = dev[2].p2p_dev_addr() 3156 a2 = binascii.unhexlify(addr2.replace(':', '')) 3157 3158 res = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 3159 dbus_interface=dbus.PROPERTIES_IFACE) 3160 if 'Peers' not in res: 3161 raise Exception("GetAll result missing Peers") 3162 if len(res['Peers']) != 0: 3163 raise Exception("Unexpected peer(s) in the list") 3164 3165 args = {'DiscoveryType': 'social', 3166 'RequestedDeviceTypes': [dbus.ByteArray(b'12345678')], 3167 'Timeout': dbus.Int32(1)} 3168 p2p.Find(dbus.Dictionary(args)) 3169 p2p.StopFind() 3170 3171 class TestDbusP2p(TestDbus): 3172 def __init__(self, bus): 3173 TestDbus.__init__(self, bus) 3174 self.found = False 3175 self.found2 = False 3176 self.found_prop = False 3177 self.lost = False 3178 self.find_stopped = False 3179 3180 def __enter__(self): 3181 gobject.timeout_add(1, self.run_test) 3182 gobject.timeout_add(15000, self.timeout) 3183 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3184 "DeviceFound") 3185 self.add_signal(self.deviceFoundProperties, 3186 WPAS_DBUS_IFACE_P2PDEVICE, "DeviceFoundProperties") 3187 self.add_signal(self.deviceLost, WPAS_DBUS_IFACE_P2PDEVICE, 3188 "DeviceLost") 3189 self.add_signal(self.provisionDiscoveryResponseEnterPin, 3190 WPAS_DBUS_IFACE_P2PDEVICE, 3191 "ProvisionDiscoveryResponseEnterPin") 3192 self.add_signal(self.findStopped, WPAS_DBUS_IFACE_P2PDEVICE, 3193 "FindStopped") 3194 self.loop.run() 3195 return self 3196 3197 def deviceFound(self, path): 3198 logger.debug("deviceFound: path=%s" % path) 3199 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Peers", 3200 dbus_interface=dbus.PROPERTIES_IFACE) 3201 if len(res) < 1: 3202 raise Exception("Unexpected number of peers") 3203 if path not in res: 3204 raise Exception("Mismatch in peer object path") 3205 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3206 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3207 dbus_interface=dbus.PROPERTIES_IFACE, 3208 byte_arrays=True) 3209 logger.debug("peer properties: " + str(res)) 3210 3211 if res['DeviceAddress'] == a1: 3212 if 'SecondaryDeviceTypes' not in res: 3213 raise Exception("Missing SecondaryDeviceTypes") 3214 sec = res['SecondaryDeviceTypes'] 3215 if len(sec) < 1: 3216 raise Exception("Secondary device type missing") 3217 if b"\x00\x01\x00\x50\xF2\x04\x00\x02" not in sec: 3218 raise Exception("Secondary device type mismatch") 3219 3220 if 'VendorExtension' not in res: 3221 raise Exception("Missing VendorExtension") 3222 vendor = res['VendorExtension'] 3223 if len(vendor) < 1: 3224 raise Exception("Vendor extension missing") 3225 if b"\x11\x22\x33\x44" not in vendor: 3226 raise Exception("Secondary device type mismatch") 3227 3228 if 'VSIE' not in res: 3229 raise Exception("Missing VSIE") 3230 vendor = res['VSIE'] 3231 if len(vendor) < 1: 3232 raise Exception("VSIE missing") 3233 if vendor != b"\xdd\x06\x00\x11\x22\x33\x55\x66": 3234 raise Exception("VSIE mismatch") 3235 3236 self.found = True 3237 elif res['DeviceAddress'] == a2: 3238 if 'IEs' not in res: 3239 raise Exception("IEs missing") 3240 if res['IEs'] != wfd: 3241 raise Exception("IEs mismatch") 3242 self.found2 = True 3243 else: 3244 raise Exception("Unexpected peer device address") 3245 3246 if self.found and self.found2: 3247 p2p.StopFind() 3248 p2p.RejectPeer(path) 3249 p2p.ProvisionDiscoveryRequest(path, 'display') 3250 3251 def deviceLost(self, path): 3252 logger.debug("deviceLost: path=%s" % path) 3253 if not self.found or not self.found2: 3254 # This may happen if a previous test case ended up scheduling 3255 # deviceLost event and that event did not get delivered before 3256 # starting the next test execution. 3257 logger.debug("Ignore deviceLost before the deviceFound events") 3258 return 3259 self.lost = True 3260 try: 3261 p2p.RejectPeer(path) 3262 raise Exception("Invalid RejectPeer accepted") 3263 except dbus.exceptions.DBusException as e: 3264 if "UnknownError: Failed to call wpas_p2p_reject" not in str(e): 3265 raise Exception("Unexpected error message for invalid RejectPeer(): " + str(e)) 3266 self.loop.quit() 3267 3268 def deviceFoundProperties(self, path, properties): 3269 logger.debug("deviceFoundProperties: path=%s" % path) 3270 logger.debug("peer properties: " + str(properties)) 3271 if properties['DeviceAddress'] == a1: 3272 self.found_prop = True 3273 3274 def provisionDiscoveryResponseEnterPin(self, peer_object): 3275 logger.debug("provisionDiscoveryResponseEnterPin - peer=%s" % peer_object) 3276 p2p.Flush() 3277 3278 def findStopped(self): 3279 logger.debug("findStopped") 3280 self.find_stopped = True 3281 3282 def run_test(self, *args): 3283 logger.debug("run_test") 3284 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3285 'Timeout': dbus.Int32(10)})) 3286 return False 3287 3288 def success(self): 3289 return self.found and self.lost and self.found2 and self.find_stopped 3290 3291 with TestDbusP2p(bus) as t: 3292 if not t.success(): 3293 raise Exception("Expected signals not seen") 3294 3295 dev[1].request("VENDOR_ELEM_REMOVE 1 *") 3296 dev[1].p2p_stop_find() 3297 3298 p2p.Listen(1) 3299 dev[2].p2p_stop_find() 3300 dev[2].request("P2P_FLUSH") 3301 if not dev[2].discover_peer(addr0): 3302 raise Exception("Peer not found") 3303 p2p.StopFind() 3304 dev[2].p2p_stop_find() 3305 3306 try: 3307 p2p.ExtendedListen(dbus.Dictionary({'foo': 100})) 3308 raise Exception("Invalid ExtendedListen accepted") 3309 except dbus.exceptions.DBusException as e: 3310 if "InvalidArgs" not in str(e): 3311 raise Exception("Unexpected error message for invalid ExtendedListen(): " + str(e)) 3312 3313 p2p.ExtendedListen(dbus.Dictionary({'period': 100, 'interval': 1000})) 3314 p2p.ExtendedListen(dbus.Dictionary({})) 3315 dev[0].global_request("P2P_EXT_LISTEN") 3316 3317def test_dbus_p2p_discovery_freq(dev, apdev): 3318 """D-Bus P2P discovery on a specific non-social channel""" 3319 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3320 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3321 3322 addr1 = dev[1].p2p_dev_addr() 3323 autogo(dev[1], freq=2422) 3324 3325 class TestDbusP2p(TestDbus): 3326 def __init__(self, bus): 3327 TestDbus.__init__(self, bus) 3328 self.found = False 3329 3330 def __enter__(self): 3331 gobject.timeout_add(1, self.run_test) 3332 gobject.timeout_add(5000, self.timeout) 3333 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3334 "DeviceFound") 3335 self.loop.run() 3336 return self 3337 3338 def deviceFound(self, path): 3339 logger.debug("deviceFound: path=%s" % path) 3340 self.found = True 3341 self.loop.quit() 3342 3343 def run_test(self, *args): 3344 logger.debug("run_test") 3345 p2p.Find(dbus.Dictionary({'freq': 2422})) 3346 return False 3347 3348 def success(self): 3349 return self.found 3350 3351 with TestDbusP2p(bus) as t: 3352 if not t.success(): 3353 raise Exception("Expected signals not seen") 3354 3355 dev[1].remove_group() 3356 p2p.StopFind() 3357 3358def test_dbus_p2p_service_discovery(dev, apdev): 3359 """D-Bus P2P service discovery""" 3360 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3361 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3362 3363 addr0 = dev[0].p2p_dev_addr() 3364 addr1 = dev[1].p2p_dev_addr() 3365 3366 bonjour_query = dbus.ByteArray(binascii.unhexlify('0b5f6166706f766572746370c00c000c01')) 3367 bonjour_response = dbus.ByteArray(binascii.unhexlify('074578616d706c65c027')) 3368 3369 args = {'service_type': 'bonjour', 3370 'query': bonjour_query, 3371 'response': bonjour_response} 3372 p2p.AddService(args) 3373 p2p.FlushService() 3374 p2p.AddService(args) 3375 3376 try: 3377 p2p.DeleteService(args) 3378 raise Exception("Invalid DeleteService() accepted") 3379 except dbus.exceptions.DBusException as e: 3380 if "InvalidArgs" not in str(e): 3381 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3382 3383 args = {'service_type': 'bonjour', 3384 'query': bonjour_query} 3385 p2p.DeleteService(args) 3386 try: 3387 p2p.DeleteService(args) 3388 raise Exception("Invalid DeleteService() accepted") 3389 except dbus.exceptions.DBusException as e: 3390 if "InvalidArgs" not in str(e): 3391 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3392 3393 args = {'service_type': 'upnp', 3394 'version': 0x10, 3395 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'} 3396 p2p.AddService(args) 3397 p2p.DeleteService(args) 3398 try: 3399 p2p.DeleteService(args) 3400 raise Exception("Invalid DeleteService() accepted") 3401 except dbus.exceptions.DBusException as e: 3402 if "InvalidArgs" not in str(e): 3403 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3404 3405 tests = [{'service_type': 'foo'}, 3406 {'service_type': 'foo', 'query': bonjour_query}, 3407 {'service_type': 'upnp'}, 3408 {'service_type': 'upnp', 'version': 0x10}, 3409 {'service_type': 'upnp', 3410 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3411 {'version': 0x10, 3412 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3413 {'service_type': 'upnp', 'foo': 'bar'}, 3414 {'service_type': 'bonjour'}, 3415 {'service_type': 'bonjour', 'query': 'foo'}, 3416 {'service_type': 'bonjour', 'foo': 'bar'}] 3417 for args in tests: 3418 try: 3419 p2p.DeleteService(args) 3420 raise Exception("Invalid DeleteService() accepted") 3421 except dbus.exceptions.DBusException as e: 3422 if "InvalidArgs" not in str(e): 3423 raise Exception("Unexpected error message for invalid DeleteService(): " + str(e)) 3424 3425 tests = [{'service_type': 'foo'}, 3426 {'service_type': 'upnp'}, 3427 {'service_type': 'upnp', 'version': 0x10}, 3428 {'service_type': 'upnp', 3429 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3430 {'version': 0x10, 3431 'service': 'uuid:6859dede-8574-59ab-9332-123456789012::upnp:rootdevice'}, 3432 {'service_type': 'upnp', 'foo': 'bar'}, 3433 {'service_type': 'bonjour'}, 3434 {'service_type': 'bonjour', 'query': 'foo'}, 3435 {'service_type': 'bonjour', 'response': 'foo'}, 3436 {'service_type': 'bonjour', 'query': bonjour_query}, 3437 {'service_type': 'bonjour', 'response': bonjour_response}, 3438 {'service_type': 'bonjour', 'query': dbus.ByteArray(500*b'a')}, 3439 {'service_type': 'bonjour', 'foo': 'bar'}] 3440 for args in tests: 3441 try: 3442 p2p.AddService(args) 3443 raise Exception("Invalid AddService() accepted") 3444 except dbus.exceptions.DBusException as e: 3445 if "InvalidArgs" not in str(e): 3446 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3447 3448 args = {'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3449 ref = p2p.ServiceDiscoveryRequest(args) 3450 p2p.ServiceDiscoveryCancelRequest(ref) 3451 try: 3452 p2p.ServiceDiscoveryCancelRequest(ref) 3453 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3454 except dbus.exceptions.DBusException as e: 3455 if "InvalidArgs" not in str(e): 3456 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3457 try: 3458 p2p.ServiceDiscoveryCancelRequest(dbus.UInt64(0)) 3459 raise Exception("Invalid ServiceDiscoveryCancelRequest() accepted") 3460 except dbus.exceptions.DBusException as e: 3461 if "InvalidArgs" not in str(e): 3462 raise Exception("Unexpected error message for invalid AddService(): " + str(e)) 3463 3464 args = {'service_type': 'upnp', 3465 'version': 0x10, 3466 'service': 'ssdp:foo'} 3467 ref = p2p.ServiceDiscoveryRequest(args) 3468 p2p.ServiceDiscoveryCancelRequest(ref) 3469 3470 tests = [{'service_type': 'foo'}, 3471 {'foo': 'bar'}, 3472 {'tlv': 'foo'}, 3473 {}, 3474 {'version': 0}, 3475 {'service_type': 'upnp', 3476 'service': 'ssdp:foo'}, 3477 {'service_type': 'upnp', 3478 'version': 0x10}, 3479 {'service_type': 'upnp', 3480 'version': 0x10, 3481 'service': 'ssdp:foo', 3482 'peer_object': dbus.ObjectPath(path + "/Peers")}, 3483 {'service_type': 'upnp', 3484 'version': 0x10, 3485 'service': 'ssdp:foo', 3486 'peer_object': path + "/Peers"}, 3487 {'service_type': 'upnp', 3488 'version': 0x10, 3489 'service': 'ssdp:foo', 3490 'peer_object': dbus.ObjectPath(path + "/Peers/00112233445566")}] 3491 for args in tests: 3492 try: 3493 p2p.ServiceDiscoveryRequest(args) 3494 raise Exception("Invalid ServiceDiscoveryRequest accepted") 3495 except dbus.exceptions.DBusException as e: 3496 if "InvalidArgs" not in str(e): 3497 raise Exception("Unexpected error message for invalid ServiceDiscoveryRequest(): " + str(e)) 3498 3499 args = {'foo': 'bar'} 3500 try: 3501 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3502 raise Exception("Invalid ServiceDiscoveryResponse accepted") 3503 except dbus.exceptions.DBusException as e: 3504 if "InvalidArgs" not in str(e): 3505 raise Exception("Unexpected error message for invalid ServiceDiscoveryResponse(): " + str(e)) 3506 3507def test_dbus_p2p_service_discovery_query(dev, apdev): 3508 """D-Bus P2P service discovery query""" 3509 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3510 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3511 3512 addr0 = dev[0].p2p_dev_addr() 3513 dev[1].request("P2P_SERVICE_ADD bonjour 0b5f6166706f766572746370c00c000c01 074578616d706c65c027") 3514 dev[1].p2p_listen() 3515 addr1 = dev[1].p2p_dev_addr() 3516 3517 class TestDbusP2p(TestDbus): 3518 def __init__(self, bus): 3519 TestDbus.__init__(self, bus) 3520 self.done = False 3521 3522 def __enter__(self): 3523 gobject.timeout_add(1, self.run_test) 3524 gobject.timeout_add(15000, self.timeout) 3525 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3526 "DeviceFound") 3527 self.add_signal(self.serviceDiscoveryResponse, 3528 WPAS_DBUS_IFACE_P2PDEVICE, 3529 "ServiceDiscoveryResponse", byte_arrays=True) 3530 self.loop.run() 3531 return self 3532 3533 def deviceFound(self, path): 3534 logger.debug("deviceFound: path=%s" % path) 3535 args = {'peer_object': path, 3536 'tlv': dbus.ByteArray(b"\x02\x00\x00\x01")} 3537 p2p.ServiceDiscoveryRequest(args) 3538 3539 def serviceDiscoveryResponse(self, sd_request): 3540 logger.debug("serviceDiscoveryResponse: sd_request=%s" % str(sd_request)) 3541 self.done = True 3542 self.loop.quit() 3543 3544 def run_test(self, *args): 3545 logger.debug("run_test") 3546 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social', 3547 'Timeout': dbus.Int32(10)})) 3548 return False 3549 3550 def success(self): 3551 return self.done 3552 3553 with TestDbusP2p(bus) as t: 3554 if not t.success(): 3555 raise Exception("Expected signals not seen") 3556 3557 dev[1].p2p_stop_find() 3558 3559def test_dbus_p2p_service_discovery_external(dev, apdev): 3560 """D-Bus P2P service discovery with external response""" 3561 try: 3562 _test_dbus_p2p_service_discovery_external(dev, apdev) 3563 finally: 3564 dev[0].request("P2P_SERV_DISC_EXTERNAL 0") 3565 3566def _test_dbus_p2p_service_discovery_external(dev, apdev): 3567 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3568 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3569 3570 addr0 = dev[0].p2p_dev_addr() 3571 addr1 = dev[1].p2p_dev_addr() 3572 resp = "0300000101" 3573 3574 dev[1].request("P2P_FLUSH") 3575 dev[1].request("P2P_SERV_DISC_REQ " + addr0 + " 02000001") 3576 dev[1].p2p_find(social=True) 3577 3578 class TestDbusP2p(TestDbus): 3579 def __init__(self, bus): 3580 TestDbus.__init__(self, bus) 3581 self.sd = False 3582 3583 def __enter__(self): 3584 gobject.timeout_add(1, self.run_test) 3585 gobject.timeout_add(15000, self.timeout) 3586 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3587 "DeviceFound") 3588 self.add_signal(self.serviceDiscoveryRequest, 3589 WPAS_DBUS_IFACE_P2PDEVICE, 3590 "ServiceDiscoveryRequest") 3591 self.loop.run() 3592 return self 3593 3594 def deviceFound(self, path): 3595 logger.debug("deviceFound: path=%s" % path) 3596 3597 def serviceDiscoveryRequest(self, sd_request): 3598 logger.debug("serviceDiscoveryRequest: sd_request=%s" % str(sd_request)) 3599 self.sd = True 3600 args = {'peer_object': sd_request['peer_object'], 3601 'frequency': sd_request['frequency'], 3602 'dialog_token': sd_request['dialog_token'], 3603 'tlvs': dbus.ByteArray(binascii.unhexlify(resp))} 3604 p2p.ServiceDiscoveryResponse(dbus.Dictionary(args, signature='sv')) 3605 self.loop.quit() 3606 3607 def run_test(self, *args): 3608 logger.debug("run_test") 3609 p2p.ServiceDiscoveryExternal(1) 3610 p2p.ServiceUpdate() 3611 p2p.Listen(15) 3612 return False 3613 3614 def success(self): 3615 return self.sd 3616 3617 with TestDbusP2p(bus) as t: 3618 if not t.success(): 3619 raise Exception("Expected signals not seen") 3620 3621 ev = dev[1].wait_global_event(["P2P-SERV-DISC-RESP"], timeout=5) 3622 if ev is None: 3623 raise Exception("Service discovery timed out") 3624 if addr0 not in ev: 3625 raise Exception("Unexpected address in SD Response: " + ev) 3626 if ev.split(' ')[4] != resp: 3627 raise Exception("Unexpected response data SD Response: " + ev) 3628 dev[1].p2p_stop_find() 3629 3630 p2p.StopFind() 3631 p2p.ServiceDiscoveryExternal(0) 3632 3633def test_dbus_p2p_autogo(dev, apdev): 3634 """D-Bus P2P autonomous GO""" 3635 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3636 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3637 3638 addr0 = dev[0].p2p_dev_addr() 3639 3640 class TestDbusP2p(TestDbus): 3641 def __init__(self, bus): 3642 TestDbus.__init__(self, bus) 3643 self.first = True 3644 self.waiting_end = False 3645 self.exceptions = False 3646 self.deauthorized = False 3647 self.done = False 3648 3649 def __enter__(self): 3650 gobject.timeout_add(1, self.run_test) 3651 gobject.timeout_add(15000, self.timeout) 3652 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3653 "DeviceFound") 3654 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3655 "GroupStarted") 3656 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3657 "GroupFinished") 3658 self.add_signal(self.persistentGroupAdded, 3659 WPAS_DBUS_IFACE_P2PDEVICE, 3660 "PersistentGroupAdded") 3661 self.add_signal(self.persistentGroupRemoved, 3662 WPAS_DBUS_IFACE_P2PDEVICE, 3663 "PersistentGroupRemoved") 3664 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 3665 WPAS_DBUS_IFACE_P2PDEVICE, 3666 "ProvisionDiscoveryRequestDisplayPin") 3667 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3668 "StaAuthorized") 3669 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 3670 "StaDeauthorized") 3671 self.loop.run() 3672 return self 3673 3674 def groupStarted(self, properties): 3675 logger.debug("groupStarted: " + str(properties)) 3676 self.group = properties['group_object'] 3677 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3678 properties['interface_object']) 3679 role = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 3680 dbus_interface=dbus.PROPERTIES_IFACE) 3681 if role != "GO": 3682 self.exceptions = True 3683 raise Exception("Unexpected role reported: " + role) 3684 group = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 3685 dbus_interface=dbus.PROPERTIES_IFACE) 3686 if group != properties['group_object']: 3687 self.exceptions = True 3688 raise Exception("Unexpected Group reported: " + str(group)) 3689 go = self.g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 3690 dbus_interface=dbus.PROPERTIES_IFACE) 3691 if go != '/': 3692 self.exceptions = True 3693 raise Exception("Unexpected PeerGO value: " + str(go)) 3694 if self.first: 3695 self.first = False 3696 logger.info("Remove persistent group instance") 3697 group_p2p = dbus.Interface(self.g_if_obj, 3698 WPAS_DBUS_IFACE_P2PDEVICE) 3699 group_p2p.Disconnect() 3700 else: 3701 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 3702 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join") 3703 3704 def groupFinished(self, properties): 3705 logger.debug("groupFinished: " + str(properties)) 3706 if self.waiting_end: 3707 logger.info("Remove persistent group") 3708 p2p.RemovePersistentGroup(self.persistent) 3709 else: 3710 logger.info("Re-start persistent group") 3711 params = dbus.Dictionary({'persistent_group_object': 3712 self.persistent, 3713 'frequency': 2412}) 3714 p2p.GroupAdd(params) 3715 3716 def persistentGroupAdded(self, path, properties): 3717 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 3718 self.persistent = path 3719 3720 def persistentGroupRemoved(self, path): 3721 logger.debug("persistentGroupRemoved: %s" % path) 3722 self.done = True 3723 self.loop.quit() 3724 3725 def deviceFound(self, path): 3726 logger.debug("deviceFound: path=%s" % path) 3727 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3728 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3729 dbus_interface=dbus.PROPERTIES_IFACE, 3730 byte_arrays=True) 3731 logger.debug('peer properties: ' + str(self.peer)) 3732 3733 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 3734 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 3735 self.peer_path = peer_object 3736 peer = binascii.unhexlify(peer_object.split('/')[-1]) 3737 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 3738 3739 params = {'Role': 'registrar', 3740 'P2PDeviceAddress': self.peer['DeviceAddress'], 3741 'Bssid': self.peer['DeviceAddress'], 3742 'Type': 'pin'} 3743 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 3744 try: 3745 wps.Start(params) 3746 self.exceptions = True 3747 raise Exception("Invalid WPS.Start() accepted") 3748 except dbus.exceptions.DBusException as e: 3749 if "InvalidArgs" not in str(e): 3750 self.exceptions = True 3751 raise Exception("Unexpected error message: " + str(e)) 3752 params = {'Role': 'registrar', 3753 'P2PDeviceAddress': self.peer['DeviceAddress'], 3754 'Type': 'pin', 3755 'Pin': '12345670'} 3756 logger.info("Authorize peer to connect to the group") 3757 wps.Start(params) 3758 3759 def staAuthorized(self, name): 3760 logger.debug("staAuthorized: " + name) 3761 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, self.peer_path) 3762 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3763 dbus_interface=dbus.PROPERTIES_IFACE, 3764 byte_arrays=True) 3765 logger.debug("Peer properties: " + str(res)) 3766 if 'Groups' not in res or len(res['Groups']) != 1: 3767 self.exceptions = True 3768 raise Exception("Unexpected number of peer Groups entries") 3769 if res['Groups'][0] != self.group: 3770 self.exceptions = True 3771 raise Exception("Unexpected peer Groups[0] value") 3772 3773 g_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group) 3774 res = g_obj.GetAll(WPAS_DBUS_GROUP, 3775 dbus_interface=dbus.PROPERTIES_IFACE, 3776 byte_arrays=True) 3777 logger.debug("Group properties: " + str(res)) 3778 if 'Members' not in res or len(res['Members']) != 1: 3779 self.exceptions = True 3780 raise Exception("Unexpected number of group members") 3781 3782 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 3783 # Earlier implementation of this interface was a bit strange. The 3784 # property is defined to have aay signature and that is what the 3785 # getter returned. However, the setter expected there to be a 3786 # dictionary with 'WPSVendorExtensions' as the key surrounding these 3787 # values.. The current implementations maintains support for that 3788 # for backwards compability reasons. Verify that encoding first. 3789 vals = dbus.Dictionary({'WPSVendorExtensions': [ext]}, 3790 signature='sv') 3791 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3792 dbus_interface=dbus.PROPERTIES_IFACE) 3793 res = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3794 dbus_interface=dbus.PROPERTIES_IFACE, 3795 byte_arrays=True) 3796 if len(res) != 1: 3797 self.exceptions = True 3798 raise Exception("Unexpected number of vendor extensions") 3799 if res[0] != ext: 3800 self.exceptions = True 3801 raise Exception("Vendor extension value changed") 3802 3803 # And now verify that the more appropriate encoding is accepted as 3804 # well. 3805 res.append(dbus.ByteArray(b'\xaa\xbb\xcc\xdd\xee\xff')) 3806 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3807 dbus_interface=dbus.PROPERTIES_IFACE) 3808 res2 = g_obj.Get(WPAS_DBUS_GROUP, 'WPSVendorExtensions', 3809 dbus_interface=dbus.PROPERTIES_IFACE, 3810 byte_arrays=True) 3811 if len(res) != 2: 3812 self.exceptions = True 3813 raise Exception("Unexpected number of vendor extensions") 3814 if res[0] != res2[0] or res[1] != res2[1]: 3815 self.exceptions = True 3816 raise Exception("Vendor extension value changed") 3817 3818 for i in range(10): 3819 res.append(dbus.ByteArray(b'\xaa\xbb')) 3820 try: 3821 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 3822 dbus_interface=dbus.PROPERTIES_IFACE) 3823 self.exceptions = True 3824 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3825 except dbus.exceptions.DBusException as e: 3826 if "Error.Failed" not in str(e): 3827 self.exceptions = True 3828 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3829 3830 vals = dbus.Dictionary({'Foo': [ext]}, signature='sv') 3831 try: 3832 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3833 dbus_interface=dbus.PROPERTIES_IFACE) 3834 self.exceptions = True 3835 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3836 except dbus.exceptions.DBusException as e: 3837 if "InvalidArgs" not in str(e): 3838 self.exceptions = True 3839 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3840 3841 vals = ["foo"] 3842 try: 3843 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3844 dbus_interface=dbus.PROPERTIES_IFACE) 3845 self.exceptions = True 3846 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3847 except dbus.exceptions.DBusException as e: 3848 if "Error.Failed" not in str(e): 3849 self.exceptions = True 3850 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3851 3852 vals = [["foo"]] 3853 try: 3854 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', vals, 3855 dbus_interface=dbus.PROPERTIES_IFACE) 3856 self.exceptions = True 3857 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 3858 except dbus.exceptions.DBusException as e: 3859 if "Error.Failed" not in str(e): 3860 self.exceptions = True 3861 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 3862 3863 p2p.RemoveClient({'peer': self.peer_path}) 3864 3865 self.waiting_end = True 3866 group_p2p = dbus.Interface(self.g_if_obj, 3867 WPAS_DBUS_IFACE_P2PDEVICE) 3868 group_p2p.Disconnect() 3869 3870 def staDeauthorized(self, name): 3871 logger.debug("staDeauthorized: " + name) 3872 self.deauthorized = True 3873 3874 def run_test(self, *args): 3875 logger.debug("run_test") 3876 params = dbus.Dictionary({'persistent': True, 3877 'frequency': 2412}) 3878 logger.info("Add a persistent group") 3879 p2p.GroupAdd(params) 3880 return False 3881 3882 def success(self): 3883 return self.done and self.deauthorized and not self.exceptions 3884 3885 with TestDbusP2p(bus) as t: 3886 if not t.success(): 3887 raise Exception("Expected signals not seen") 3888 3889 dev[1].wait_go_ending_session() 3890 3891def test_dbus_p2p_autogo_pbc(dev, apdev): 3892 """D-Bus P2P autonomous GO and PBC""" 3893 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3894 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3895 3896 addr0 = dev[0].p2p_dev_addr() 3897 3898 class TestDbusP2p(TestDbus): 3899 def __init__(self, bus): 3900 TestDbus.__init__(self, bus) 3901 self.first = True 3902 self.waiting_end = False 3903 self.done = False 3904 3905 def __enter__(self): 3906 gobject.timeout_add(1, self.run_test) 3907 gobject.timeout_add(15000, self.timeout) 3908 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 3909 "DeviceFound") 3910 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3911 "GroupStarted") 3912 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3913 "GroupFinished") 3914 self.add_signal(self.provisionDiscoveryPBCRequest, 3915 WPAS_DBUS_IFACE_P2PDEVICE, 3916 "ProvisionDiscoveryPBCRequest") 3917 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3918 "StaAuthorized") 3919 self.loop.run() 3920 return self 3921 3922 def groupStarted(self, properties): 3923 logger.debug("groupStarted: " + str(properties)) 3924 self.group = properties['group_object'] 3925 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 3926 properties['interface_object']) 3927 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 3928 dev1.global_request("P2P_CONNECT " + addr0 + " pbc join") 3929 3930 def groupFinished(self, properties): 3931 logger.debug("groupFinished: " + str(properties)) 3932 self.done = True 3933 self.loop.quit() 3934 3935 def deviceFound(self, path): 3936 logger.debug("deviceFound: path=%s" % path) 3937 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 3938 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 3939 dbus_interface=dbus.PROPERTIES_IFACE, 3940 byte_arrays=True) 3941 logger.debug('peer properties: ' + str(self.peer)) 3942 3943 def provisionDiscoveryPBCRequest(self, peer_object): 3944 logger.debug("provisionDiscoveryPBCRequest - peer=%s" % peer_object) 3945 self.peer_path = peer_object 3946 peer = binascii.unhexlify(peer_object.split('/')[-1]) 3947 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 3948 params = {'Role': 'registrar', 3949 'P2PDeviceAddress': self.peer['DeviceAddress'], 3950 'Type': 'pbc'} 3951 logger.info("Authorize peer to connect to the group") 3952 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 3953 wps.Start(params) 3954 3955 def staAuthorized(self, name): 3956 logger.debug("staAuthorized: " + name) 3957 group_p2p = dbus.Interface(self.g_if_obj, 3958 WPAS_DBUS_IFACE_P2PDEVICE) 3959 group_p2p.Disconnect() 3960 3961 def run_test(self, *args): 3962 logger.debug("run_test") 3963 params = dbus.Dictionary({'frequency': 2412}) 3964 p2p.GroupAdd(params) 3965 return False 3966 3967 def success(self): 3968 return self.done 3969 3970 with TestDbusP2p(bus) as t: 3971 if not t.success(): 3972 raise Exception("Expected signals not seen") 3973 3974 dev[1].wait_go_ending_session() 3975 dev[1].flush_scan_cache() 3976 3977def test_dbus_p2p_autogo_legacy(dev, apdev): 3978 """D-Bus P2P autonomous GO and legacy STA""" 3979 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 3980 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 3981 3982 addr0 = dev[0].p2p_dev_addr() 3983 3984 class TestDbusP2p(TestDbus): 3985 def __init__(self, bus): 3986 TestDbus.__init__(self, bus) 3987 self.done = False 3988 3989 def __enter__(self): 3990 gobject.timeout_add(1, self.run_test) 3991 gobject.timeout_add(15000, self.timeout) 3992 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 3993 "GroupStarted") 3994 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 3995 "GroupFinished") 3996 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 3997 "StaAuthorized") 3998 self.loop.run() 3999 return self 4000 4001 def groupStarted(self, properties): 4002 logger.debug("groupStarted: " + str(properties)) 4003 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4004 properties['group_object']) 4005 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4006 dbus_interface=dbus.PROPERTIES_IFACE, 4007 byte_arrays=True) 4008 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4009 4010 pin = '12345670' 4011 params = {'Role': 'enrollee', 4012 'Type': 'pin', 4013 'Pin': pin} 4014 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4015 properties['interface_object']) 4016 wps = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_WPS) 4017 wps.Start(params) 4018 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4019 dev1.scan_for_bss(bssid, freq=2412) 4020 dev1.request("WPS_PIN " + bssid + " " + pin) 4021 self.group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4022 4023 def groupFinished(self, properties): 4024 logger.debug("groupFinished: " + str(properties)) 4025 self.done = True 4026 self.loop.quit() 4027 4028 def staAuthorized(self, name): 4029 logger.debug("staAuthorized: " + name) 4030 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4031 dev1.request("DISCONNECT") 4032 self.group_p2p.Disconnect() 4033 4034 def run_test(self, *args): 4035 logger.debug("run_test") 4036 params = dbus.Dictionary({'frequency': 2412}) 4037 p2p.GroupAdd(params) 4038 return False 4039 4040 def success(self): 4041 return self.done 4042 4043 with TestDbusP2p(bus) as t: 4044 if not t.success(): 4045 raise Exception("Expected signals not seen") 4046 4047def test_dbus_p2p_join(dev, apdev): 4048 """D-Bus P2P join an autonomous GO""" 4049 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4050 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4051 4052 addr1 = dev[1].p2p_dev_addr() 4053 addr2 = dev[2].p2p_dev_addr() 4054 dev[1].p2p_start_go(freq=2412) 4055 dev1_group_ifname = dev[1].group_ifname 4056 dev[2].p2p_listen() 4057 4058 class TestDbusP2p(TestDbus): 4059 def __init__(self, bus): 4060 TestDbus.__init__(self, bus) 4061 self.done = False 4062 self.peer = None 4063 self.go = None 4064 4065 def __enter__(self): 4066 gobject.timeout_add(1, self.run_test) 4067 gobject.timeout_add(15000, self.timeout) 4068 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4069 "DeviceFound") 4070 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4071 "GroupStarted") 4072 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4073 "GroupFinished") 4074 self.add_signal(self.invitationResult, WPAS_DBUS_IFACE_P2PDEVICE, 4075 "InvitationResult") 4076 self.loop.run() 4077 return self 4078 4079 def deviceFound(self, path): 4080 logger.debug("deviceFound: path=%s" % path) 4081 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4082 res = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4083 dbus_interface=dbus.PROPERTIES_IFACE, 4084 byte_arrays=True) 4085 logger.debug('peer properties: ' + str(res)) 4086 if addr2.replace(':', '') in path: 4087 self.peer = path 4088 elif addr1.replace(':', '') in path: 4089 self.go = path 4090 if self.peer and self.go: 4091 logger.info("Join the group") 4092 p2p.StopFind() 4093 args = {'peer': self.go, 4094 'join': True, 4095 'wps_method': 'pin', 4096 'frequency': 2412} 4097 pin = p2p.Connect(args) 4098 4099 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4100 dev1.group_ifname = dev1_group_ifname 4101 dev1.group_request("WPS_PIN any " + pin) 4102 4103 def groupStarted(self, properties): 4104 logger.debug("groupStarted: " + str(properties)) 4105 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4106 properties['interface_object']) 4107 role = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Role", 4108 dbus_interface=dbus.PROPERTIES_IFACE) 4109 if role != "client": 4110 raise Exception("Unexpected role reported: " + role) 4111 group = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "Group", 4112 dbus_interface=dbus.PROPERTIES_IFACE) 4113 if group != properties['group_object']: 4114 raise Exception("Unexpected Group reported: " + str(group)) 4115 go = g_if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PeerGO", 4116 dbus_interface=dbus.PROPERTIES_IFACE) 4117 if go != self.go: 4118 raise Exception("Unexpected PeerGO value: " + str(go)) 4119 4120 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4121 properties['group_object']) 4122 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4123 dbus_interface=dbus.PROPERTIES_IFACE, 4124 byte_arrays=True) 4125 logger.debug("Group properties: " + str(res)) 4126 4127 ext = dbus.ByteArray(b"\x11\x22\x33\x44") 4128 try: 4129 # Set(WPSVendorExtensions) not allowed for P2P Client 4130 g_obj.Set(WPAS_DBUS_GROUP, 'WPSVendorExtensions', res, 4131 dbus_interface=dbus.PROPERTIES_IFACE) 4132 raise Exception("Invalid Set(WPSVendorExtensions) accepted") 4133 except dbus.exceptions.DBusException as e: 4134 if "Error.Failed: Failed to set property" not in str(e): 4135 raise Exception("Unexpected error message for invalid Set(WPSVendorExtensions): " + str(e)) 4136 4137 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4138 args = {'duration1': 30000, 'interval1': 102400, 4139 'duration2': 20000, 'interval2': 102400} 4140 group_p2p.PresenceRequest(args) 4141 4142 args = {'peer': self.peer} 4143 group_p2p.Invite(args) 4144 4145 def groupFinished(self, properties): 4146 logger.debug("groupFinished: " + str(properties)) 4147 self.done = True 4148 self.loop.quit() 4149 4150 def invitationResult(self, result): 4151 logger.debug("invitationResult: " + str(result)) 4152 if result['status'] != 1: 4153 raise Exception("Unexpected invitation result: " + str(result)) 4154 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4155 dev1.group_ifname = dev1_group_ifname 4156 dev1.remove_group() 4157 4158 def run_test(self, *args): 4159 logger.debug("run_test") 4160 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4161 return False 4162 4163 def success(self): 4164 return self.done 4165 4166 with TestDbusP2p(bus) as t: 4167 if not t.success(): 4168 raise Exception("Expected signals not seen") 4169 4170 dev[2].p2p_stop_find() 4171 4172def test_dbus_p2p_invitation_received(dev, apdev): 4173 """D-Bus P2P and InvitationReceived""" 4174 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4175 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4176 4177 form(dev[0], dev[1]) 4178 addr0 = dev[0].p2p_dev_addr() 4179 dev[0].p2p_listen() 4180 dev[0].global_request("SET persistent_reconnect 0") 4181 4182 if not dev[1].discover_peer(addr0, social=True): 4183 raise Exception("Peer " + addr0 + " not found") 4184 peer = dev[1].get_peer(addr0) 4185 4186 class TestDbusP2p(TestDbus): 4187 def __init__(self, bus): 4188 TestDbus.__init__(self, bus) 4189 self.done = False 4190 4191 def __enter__(self): 4192 gobject.timeout_add(1, self.run_test) 4193 gobject.timeout_add(15000, self.timeout) 4194 self.add_signal(self.invitationReceived, WPAS_DBUS_IFACE_P2PDEVICE, 4195 "InvitationReceived") 4196 self.loop.run() 4197 return self 4198 4199 def invitationReceived(self, result): 4200 logger.debug("invitationReceived: " + str(result)) 4201 self.done = True 4202 self.loop.quit() 4203 4204 def run_test(self, *args): 4205 logger.debug("run_test") 4206 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4207 cmd = "P2P_INVITE persistent=" + peer['persistent'] + " peer=" + addr0 4208 dev1.global_request(cmd) 4209 return False 4210 4211 def success(self): 4212 return self.done 4213 4214 with TestDbusP2p(bus) as t: 4215 if not t.success(): 4216 raise Exception("Expected signals not seen") 4217 4218 dev[0].p2p_stop_find() 4219 dev[1].p2p_stop_find() 4220 4221def test_dbus_p2p_config(dev, apdev): 4222 """D-Bus Get/Set P2PDeviceConfig""" 4223 try: 4224 _test_dbus_p2p_config(dev, apdev) 4225 finally: 4226 dev[0].request("P2P_SET ssid_postfix ") 4227 4228def _test_dbus_p2p_config(dev, apdev): 4229 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4230 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4231 4232 res = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4233 dbus_interface=dbus.PROPERTIES_IFACE, 4234 byte_arrays=True) 4235 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", res, 4236 dbus_interface=dbus.PROPERTIES_IFACE) 4237 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4238 dbus_interface=dbus.PROPERTIES_IFACE, 4239 byte_arrays=True) 4240 4241 if len(res) != len(res2): 4242 raise Exception("Different number of parameters") 4243 for k in res: 4244 if res[k] != res2[k]: 4245 raise Exception("Parameter %s value changes" % k) 4246 4247 changes = {'SsidPostfix': 'foo', 4248 'VendorExtension': [dbus.ByteArray(b'\x11\x22\x33\x44')], 4249 'SecondaryDeviceTypes': [dbus.ByteArray(b'\x11\x22\x33\x44\x55\x66\x77\x88')]} 4250 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4251 dbus.Dictionary(changes, signature='sv'), 4252 dbus_interface=dbus.PROPERTIES_IFACE) 4253 4254 res2 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4255 dbus_interface=dbus.PROPERTIES_IFACE, 4256 byte_arrays=True) 4257 logger.debug("P2PDeviceConfig: " + str(res2)) 4258 if 'VendorExtension' not in res2 or len(res2['VendorExtension']) != 1: 4259 raise Exception("VendorExtension does not match") 4260 if 'SecondaryDeviceTypes' not in res2 or len(res2['SecondaryDeviceTypes']) != 1: 4261 raise Exception("SecondaryDeviceType does not match") 4262 4263 changes = {'SsidPostfix': '', 4264 'VendorExtension': dbus.Array([], signature="ay"), 4265 'SecondaryDeviceTypes': dbus.Array([], signature="ay")} 4266 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4267 dbus.Dictionary(changes, signature='sv'), 4268 dbus_interface=dbus.PROPERTIES_IFACE) 4269 4270 res3 = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4271 dbus_interface=dbus.PROPERTIES_IFACE, 4272 byte_arrays=True) 4273 logger.debug("P2PDeviceConfig: " + str(res3)) 4274 if 'VendorExtension' in res3: 4275 raise Exception("VendorExtension not removed") 4276 if 'SecondaryDeviceTypes' in res3: 4277 raise Exception("SecondaryDeviceType not removed") 4278 4279 try: 4280 dev[0].request("P2P_SET disabled 1") 4281 if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4282 dbus_interface=dbus.PROPERTIES_IFACE, 4283 byte_arrays=True) 4284 raise Exception("Invalid Get(P2PDeviceConfig) accepted") 4285 except dbus.exceptions.DBusException as e: 4286 if "Error.Failed: P2P is not available for this interface" not in str(e): 4287 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4288 finally: 4289 dev[0].request("P2P_SET disabled 0") 4290 4291 try: 4292 dev[0].request("P2P_SET disabled 1") 4293 changes = {'SsidPostfix': 'foo'} 4294 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4295 dbus.Dictionary(changes, signature='sv'), 4296 dbus_interface=dbus.PROPERTIES_IFACE) 4297 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4298 except dbus.exceptions.DBusException as e: 4299 if "Error.Failed: P2P is not available for this interface" not in str(e): 4300 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4301 finally: 4302 dev[0].request("P2P_SET disabled 0") 4303 4304 tests = [{'DeviceName': 123}, 4305 {'SsidPostfix': 123}, 4306 {'Foo': 'Bar'}] 4307 for changes in tests: 4308 try: 4309 if_obj.Set(WPAS_DBUS_IFACE_P2PDEVICE, "P2PDeviceConfig", 4310 dbus.Dictionary(changes, signature='sv'), 4311 dbus_interface=dbus.PROPERTIES_IFACE) 4312 raise Exception("Invalid Set(P2PDeviceConfig) accepted") 4313 except dbus.exceptions.DBusException as e: 4314 if "InvalidArgs" not in str(e): 4315 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4316 4317def test_dbus_p2p_persistent(dev, apdev): 4318 """D-Bus P2P persistent group""" 4319 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4320 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4321 4322 class TestDbusP2p(TestDbus): 4323 def __init__(self, bus): 4324 TestDbus.__init__(self, bus) 4325 4326 def __enter__(self): 4327 gobject.timeout_add(1, self.run_test) 4328 gobject.timeout_add(15000, self.timeout) 4329 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4330 "GroupStarted") 4331 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4332 "GroupFinished") 4333 self.add_signal(self.persistentGroupAdded, 4334 WPAS_DBUS_IFACE_P2PDEVICE, 4335 "PersistentGroupAdded") 4336 self.loop.run() 4337 return self 4338 4339 def groupStarted(self, properties): 4340 logger.debug("groupStarted: " + str(properties)) 4341 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4342 properties['interface_object']) 4343 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4344 group_p2p.Disconnect() 4345 4346 def groupFinished(self, properties): 4347 logger.debug("groupFinished: " + str(properties)) 4348 self.loop.quit() 4349 4350 def persistentGroupAdded(self, path, properties): 4351 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4352 self.persistent = path 4353 4354 def run_test(self, *args): 4355 logger.debug("run_test") 4356 params = dbus.Dictionary({'persistent': True, 4357 'frequency': 2412}) 4358 logger.info("Add a persistent group") 4359 p2p.GroupAdd(params) 4360 return False 4361 4362 def success(self): 4363 return True 4364 4365 with TestDbusP2p(bus) as t: 4366 if not t.success(): 4367 raise Exception("Expected signals not seen") 4368 persistent = t.persistent 4369 4370 p_obj = bus.get_object(WPAS_DBUS_SERVICE, persistent) 4371 res = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4372 dbus_interface=dbus.PROPERTIES_IFACE, byte_arrays=True) 4373 logger.info("Persistent group Properties: " + str(res)) 4374 vals = dbus.Dictionary({'ssid': 'DIRECT-foo'}, signature='sv') 4375 p_obj.Set(WPAS_DBUS_PERSISTENT_GROUP, "Properties", vals, 4376 dbus_interface=dbus.PROPERTIES_IFACE) 4377 res2 = p_obj.Get(WPAS_DBUS_PERSISTENT_GROUP, "Properties", 4378 dbus_interface=dbus.PROPERTIES_IFACE) 4379 if len(res) != len(res2): 4380 raise Exception("Different number of parameters") 4381 for k in res: 4382 if k != 'ssid' and res[k] != res2[k]: 4383 raise Exception("Parameter %s value changes" % k) 4384 if res2['ssid'] != '"DIRECT-foo"': 4385 raise Exception("Unexpected ssid") 4386 4387 args = dbus.Dictionary({'ssid': 'DIRECT-testing', 4388 'psk': '1234567890'}, signature='sv') 4389 group = p2p.AddPersistentGroup(args) 4390 4391 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4392 dbus_interface=dbus.PROPERTIES_IFACE) 4393 if len(groups) != 2: 4394 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4395 4396 p2p.RemoveAllPersistentGroups() 4397 4398 groups = if_obj.Get(WPAS_DBUS_IFACE_P2PDEVICE, "PersistentGroups", 4399 dbus_interface=dbus.PROPERTIES_IFACE) 4400 if len(groups) != 0: 4401 raise Exception("Unexpected number of persistent groups: " + str(groups)) 4402 4403 try: 4404 p2p.RemovePersistentGroup(persistent) 4405 raise Exception("Invalid RemovePersistentGroup accepted") 4406 except dbus.exceptions.DBusException as e: 4407 if "NetworkUnknown: There is no such persistent group" not in str(e): 4408 raise Exception("Unexpected error message for invalid RemovePersistentGroup: " + str(e)) 4409 4410def test_dbus_p2p_reinvoke_persistent(dev, apdev): 4411 """D-Bus P2P reinvoke persistent group""" 4412 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4413 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4414 4415 addr0 = dev[0].p2p_dev_addr() 4416 4417 class TestDbusP2p(TestDbus): 4418 def __init__(self, bus): 4419 TestDbus.__init__(self, bus) 4420 self.first = True 4421 self.waiting_end = False 4422 self.done = False 4423 self.invited = False 4424 4425 def __enter__(self): 4426 gobject.timeout_add(1, self.run_test) 4427 gobject.timeout_add(15000, self.timeout) 4428 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4429 "DeviceFound") 4430 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4431 "GroupStarted") 4432 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4433 "GroupFinished") 4434 self.add_signal(self.persistentGroupAdded, 4435 WPAS_DBUS_IFACE_P2PDEVICE, 4436 "PersistentGroupAdded") 4437 self.add_signal(self.provisionDiscoveryRequestDisplayPin, 4438 WPAS_DBUS_IFACE_P2PDEVICE, 4439 "ProvisionDiscoveryRequestDisplayPin") 4440 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 4441 "StaAuthorized") 4442 self.loop.run() 4443 return self 4444 4445 def groupStarted(self, properties): 4446 logger.debug("groupStarted: " + str(properties)) 4447 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4448 properties['interface_object']) 4449 if not self.invited: 4450 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 4451 properties['group_object']) 4452 res = g_obj.GetAll(WPAS_DBUS_GROUP, 4453 dbus_interface=dbus.PROPERTIES_IFACE, 4454 byte_arrays=True) 4455 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', res['BSSID'])]) 4456 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4457 dev1.scan_for_bss(bssid, freq=2412) 4458 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 join") 4459 4460 def groupFinished(self, properties): 4461 logger.debug("groupFinished: " + str(properties)) 4462 if self.invited: 4463 self.done = True 4464 self.loop.quit() 4465 else: 4466 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4467 dev1.global_request("SET persistent_reconnect 1") 4468 dev1.p2p_listen() 4469 4470 args = {'persistent_group_object': dbus.ObjectPath(path), 4471 'peer': self.peer_path} 4472 try: 4473 pin = p2p.Invite(args) 4474 raise Exception("Invalid Invite accepted") 4475 except dbus.exceptions.DBusException as e: 4476 if "InvalidArgs" not in str(e): 4477 raise Exception("Unexpected error message for invalid Invite: " + str(e)) 4478 4479 args = {'persistent_group_object': self.persistent, 4480 'peer': self.peer_path} 4481 pin = p2p.Invite(args) 4482 self.invited = True 4483 4484 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], 4485 timeout=15) 4486 if self.sta_group_ev is None: 4487 raise Exception("P2P-GROUP-STARTED event not seen") 4488 4489 def persistentGroupAdded(self, path, properties): 4490 logger.debug("persistentGroupAdded: %s %s" % (path, str(properties))) 4491 self.persistent = path 4492 4493 def deviceFound(self, path): 4494 logger.debug("deviceFound: path=%s" % path) 4495 peer_obj = bus.get_object(WPAS_DBUS_SERVICE, path) 4496 self.peer = peer_obj.GetAll(WPAS_DBUS_P2P_PEER, 4497 dbus_interface=dbus.PROPERTIES_IFACE, 4498 byte_arrays=True) 4499 4500 def provisionDiscoveryRequestDisplayPin(self, peer_object, pin): 4501 logger.debug("provisionDiscoveryRequestDisplayPin - peer=%s pin=%s" % (peer_object, pin)) 4502 self.peer_path = peer_object 4503 peer = binascii.unhexlify(peer_object.split('/')[-1]) 4504 addr = ':'.join(["%02x" % i for i in struct.unpack('6B', peer)]) 4505 params = {'Role': 'registrar', 4506 'P2PDeviceAddress': self.peer['DeviceAddress'], 4507 'Bssid': self.peer['DeviceAddress'], 4508 'Type': 'pin', 4509 'Pin': '12345670'} 4510 logger.info("Authorize peer to connect to the group") 4511 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4512 wps = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_WPS) 4513 wps.Start(params) 4514 self.sta_group_ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], 4515 timeout=15) 4516 if self.sta_group_ev is None: 4517 raise Exception("P2P-GROUP-STARTED event not seen") 4518 4519 def staAuthorized(self, name): 4520 logger.debug("staAuthorized: " + name) 4521 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4522 dev1.group_form_result(self.sta_group_ev) 4523 dev1.remove_group() 4524 ev = dev1.wait_global_event(["P2P-GROUP-REMOVED"], timeout=10) 4525 if ev is None: 4526 raise Exception("Group removal timed out") 4527 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4528 group_p2p.Disconnect() 4529 4530 def run_test(self, *args): 4531 logger.debug("run_test") 4532 params = dbus.Dictionary({'persistent': True, 4533 'frequency': 2412}) 4534 logger.info("Add a persistent group") 4535 p2p.GroupAdd(params) 4536 return False 4537 4538 def success(self): 4539 return self.done 4540 4541 with TestDbusP2p(bus) as t: 4542 if not t.success(): 4543 raise Exception("Expected signals not seen") 4544 4545def test_dbus_p2p_go_neg_rx(dev, apdev): 4546 """D-Bus P2P GO Negotiation receive""" 4547 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4548 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4549 addr0 = dev[0].p2p_dev_addr() 4550 4551 class TestDbusP2p(TestDbus): 4552 def __init__(self, bus): 4553 TestDbus.__init__(self, bus) 4554 self.done = False 4555 4556 def __enter__(self): 4557 gobject.timeout_add(1, self.run_test) 4558 gobject.timeout_add(15000, self.timeout) 4559 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4560 "DeviceFound") 4561 self.add_signal(self.goNegotiationRequest, 4562 WPAS_DBUS_IFACE_P2PDEVICE, 4563 "GONegotiationRequest", 4564 byte_arrays=True) 4565 self.add_signal(self.goNegotiationSuccess, 4566 WPAS_DBUS_IFACE_P2PDEVICE, 4567 "GONegotiationSuccess", 4568 byte_arrays=True) 4569 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4570 "GroupStarted") 4571 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4572 "GroupFinished") 4573 self.loop.run() 4574 return self 4575 4576 def deviceFound(self, path): 4577 logger.debug("deviceFound: path=%s" % path) 4578 4579 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 4580 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 4581 if dev_passwd_id != 1: 4582 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 4583 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4584 'go_intent': 15, 'persistent': False, 'frequency': 5175} 4585 try: 4586 p2p.Connect(args) 4587 raise Exception("Invalid Connect accepted") 4588 except dbus.exceptions.DBusException as e: 4589 if "ConnectChannelUnsupported" not in str(e): 4590 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4591 4592 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 4593 'go_intent': 15, 'persistent': False} 4594 p2p.Connect(args) 4595 4596 def goNegotiationSuccess(self, properties): 4597 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4598 4599 def groupStarted(self, properties): 4600 logger.debug("groupStarted: " + str(properties)) 4601 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4602 properties['interface_object']) 4603 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4604 group_p2p.Disconnect() 4605 4606 def groupFinished(self, properties): 4607 logger.debug("groupFinished: " + str(properties)) 4608 self.done = True 4609 self.loop.quit() 4610 4611 def run_test(self, *args): 4612 logger.debug("run_test") 4613 p2p.Listen(10) 4614 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4615 if not dev1.discover_peer(addr0): 4616 raise Exception("Peer not found") 4617 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 enter") 4618 return False 4619 4620 def success(self): 4621 return self.done 4622 4623 with TestDbusP2p(bus) as t: 4624 if not t.success(): 4625 raise Exception("Expected signals not seen") 4626 4627def test_dbus_p2p_go_neg_auth(dev, apdev): 4628 """D-Bus P2P GO Negotiation authorized""" 4629 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4630 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4631 addr0 = dev[0].p2p_dev_addr() 4632 dev[1].p2p_listen() 4633 4634 class TestDbusP2p(TestDbus): 4635 def __init__(self, bus): 4636 TestDbus.__init__(self, bus) 4637 self.done = False 4638 self.peer_joined = False 4639 self.peer_disconnected = False 4640 4641 def __enter__(self): 4642 gobject.timeout_add(1, self.run_test) 4643 gobject.timeout_add(15000, self.timeout) 4644 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4645 "DeviceFound") 4646 self.add_signal(self.goNegotiationSuccess, 4647 WPAS_DBUS_IFACE_P2PDEVICE, 4648 "GONegotiationSuccess", 4649 byte_arrays=True) 4650 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4651 "GroupStarted") 4652 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4653 "GroupFinished") 4654 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 4655 "StaDeauthorized") 4656 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 4657 "PeerJoined") 4658 self.add_signal(self.peerDisconnected, WPAS_DBUS_GROUP, 4659 "PeerDisconnected") 4660 self.loop.run() 4661 return self 4662 4663 def deviceFound(self, path): 4664 logger.debug("deviceFound: path=%s" % path) 4665 args = {'peer': path, 'wps_method': 'keypad', 4666 'go_intent': 15, 'authorize_only': True} 4667 try: 4668 p2p.Connect(args) 4669 raise Exception("Invalid Connect accepted") 4670 except dbus.exceptions.DBusException as e: 4671 if "InvalidArgs" not in str(e): 4672 raise Exception("Unexpected error message for invalid Connect: " + str(e)) 4673 4674 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4675 'go_intent': 15, 'authorize_only': True} 4676 p2p.Connect(args) 4677 p2p.Listen(10) 4678 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4679 if not dev1.discover_peer(addr0): 4680 raise Exception("Peer not found") 4681 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=0") 4682 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4683 if ev is None: 4684 raise Exception("Group formation timed out") 4685 self.sta_group_ev = ev 4686 4687 def goNegotiationSuccess(self, properties): 4688 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4689 4690 def groupStarted(self, properties): 4691 logger.debug("groupStarted: " + str(properties)) 4692 self.g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4693 properties['interface_object']) 4694 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4695 dev1.group_form_result(self.sta_group_ev) 4696 dev1.remove_group() 4697 4698 def staDeauthorized(self, name): 4699 logger.debug("staDeuthorized: " + name) 4700 group_p2p = dbus.Interface(self.g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4701 group_p2p.Disconnect() 4702 4703 def peerJoined(self, peer): 4704 logger.debug("peerJoined: " + peer) 4705 self.peer_joined = True 4706 4707 def peerDisconnected(self, peer): 4708 logger.debug("peerDisconnected: " + peer) 4709 self.peer_disconnected = True 4710 4711 def groupFinished(self, properties): 4712 logger.debug("groupFinished: " + str(properties)) 4713 self.done = True 4714 self.loop.quit() 4715 4716 def run_test(self, *args): 4717 logger.debug("run_test") 4718 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4719 return False 4720 4721 def success(self): 4722 return self.done and self.peer_joined and self.peer_disconnected 4723 4724 with TestDbusP2p(bus) as t: 4725 if not t.success(): 4726 raise Exception("Expected signals not seen") 4727 4728def test_dbus_p2p_go_neg_init(dev, apdev): 4729 """D-Bus P2P GO Negotiation initiation""" 4730 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4731 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4732 addr0 = dev[0].p2p_dev_addr() 4733 dev[1].p2p_listen() 4734 4735 class TestDbusP2p(TestDbus): 4736 def __init__(self, bus): 4737 TestDbus.__init__(self, bus) 4738 self.done = False 4739 self.peer_group_added = False 4740 self.peer_group_removed = False 4741 4742 def __enter__(self): 4743 gobject.timeout_add(1, self.run_test) 4744 gobject.timeout_add(15000, self.timeout) 4745 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4746 "DeviceFound") 4747 self.add_signal(self.goNegotiationSuccess, 4748 WPAS_DBUS_IFACE_P2PDEVICE, 4749 "GONegotiationSuccess", 4750 byte_arrays=True) 4751 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4752 "GroupStarted") 4753 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4754 "GroupFinished") 4755 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4756 "PropertiesChanged") 4757 self.loop.run() 4758 return self 4759 4760 def deviceFound(self, path): 4761 logger.debug("deviceFound: path=%s" % path) 4762 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4763 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4764 'go_intent': 0} 4765 p2p.Connect(args) 4766 4767 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4768 if ev is None: 4769 raise Exception("Timeout while waiting for GO Neg Request") 4770 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4771 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4772 if ev is None: 4773 raise Exception("Group formation timed out") 4774 self.sta_group_ev = ev 4775 dev1.close_monitor_global() 4776 dev1.close_monitor_mon() 4777 dev1 = None 4778 4779 def goNegotiationSuccess(self, properties): 4780 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4781 4782 def groupStarted(self, properties): 4783 logger.debug("groupStarted: " + str(properties)) 4784 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4785 properties['interface_object']) 4786 group_p2p = dbus.Interface(g_if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4787 group_p2p.Disconnect() 4788 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4789 dev1.group_form_result(self.sta_group_ev) 4790 dev1.remove_group() 4791 dev1 = None 4792 4793 def groupFinished(self, properties): 4794 logger.debug("groupFinished: " + str(properties)) 4795 self.done = True 4796 4797 def propertiesChanged(self, interface_name, changed_properties, 4798 invalidated_properties): 4799 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4800 if interface_name != WPAS_DBUS_P2P_PEER: 4801 return 4802 if "Groups" not in changed_properties: 4803 return 4804 if len(changed_properties["Groups"]) > 0: 4805 self.peer_group_added = True 4806 if len(changed_properties["Groups"]) == 0: 4807 if not self.peer_group_added: 4808 # This is likely a leftover event from an earlier test case, 4809 # ignore it to allow this test case to go through its steps. 4810 logger.info("Ignore propertiesChanged indicating group removal before group has been added") 4811 return 4812 self.peer_group_removed = True 4813 self.loop.quit() 4814 4815 def run_test(self, *args): 4816 logger.debug("run_test") 4817 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4818 return False 4819 4820 def success(self): 4821 return self.done and self.peer_group_added and self.peer_group_removed 4822 4823 with TestDbusP2p(bus) as t: 4824 if not t.success(): 4825 raise Exception("Expected signals not seen") 4826 4827def test_dbus_p2p_group_termination_by_go(dev, apdev): 4828 """D-Bus P2P group removal on GO terminating the group""" 4829 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4830 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4831 addr0 = dev[0].p2p_dev_addr() 4832 dev[1].p2p_listen() 4833 4834 class TestDbusP2p(TestDbus): 4835 def __init__(self, bus): 4836 TestDbus.__init__(self, bus) 4837 self.done = False 4838 self.peer_group_added = False 4839 self.peer_group_removed = False 4840 4841 def __enter__(self): 4842 gobject.timeout_add(1, self.run_test) 4843 gobject.timeout_add(15000, self.timeout) 4844 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4845 "DeviceFound") 4846 self.add_signal(self.goNegotiationSuccess, 4847 WPAS_DBUS_IFACE_P2PDEVICE, 4848 "GONegotiationSuccess", 4849 byte_arrays=True) 4850 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4851 "GroupStarted") 4852 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4853 "GroupFinished") 4854 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4855 "PropertiesChanged") 4856 self.loop.run() 4857 return self 4858 4859 def deviceFound(self, path): 4860 logger.debug("deviceFound: path=%s" % path) 4861 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4862 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4863 'go_intent': 0} 4864 p2p.Connect(args) 4865 4866 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4867 if ev is None: 4868 raise Exception("Timeout while waiting for GO Neg Request") 4869 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4870 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4871 if ev is None: 4872 raise Exception("Group formation timed out") 4873 self.sta_group_ev = ev 4874 dev1.close_monitor_global() 4875 dev1.close_monitor_mon() 4876 dev1 = None 4877 4878 def goNegotiationSuccess(self, properties): 4879 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4880 4881 def groupStarted(self, properties): 4882 logger.debug("groupStarted: " + str(properties)) 4883 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4884 properties['interface_object']) 4885 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4886 dev1.group_form_result(self.sta_group_ev) 4887 dev1.remove_group() 4888 4889 def groupFinished(self, properties): 4890 logger.debug("groupFinished: " + str(properties)) 4891 self.done = True 4892 4893 def propertiesChanged(self, interface_name, changed_properties, 4894 invalidated_properties): 4895 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 4896 if interface_name != WPAS_DBUS_P2P_PEER: 4897 return 4898 if "Groups" not in changed_properties: 4899 return 4900 if len(changed_properties["Groups"]) > 0: 4901 self.peer_group_added = True 4902 if len(changed_properties["Groups"]) == 0 and self.peer_group_added: 4903 self.peer_group_removed = True 4904 self.loop.quit() 4905 4906 def run_test(self, *args): 4907 logger.debug("run_test") 4908 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 4909 return False 4910 4911 def success(self): 4912 return self.done and self.peer_group_added and self.peer_group_removed 4913 4914 with TestDbusP2p(bus) as t: 4915 if not t.success(): 4916 raise Exception("Expected signals not seen") 4917 4918def test_dbus_p2p_group_idle_timeout(dev, apdev): 4919 """D-Bus P2P group removal on idle timeout""" 4920 try: 4921 dev[0].global_request("SET p2p_group_idle 1") 4922 _test_dbus_p2p_group_idle_timeout(dev, apdev) 4923 finally: 4924 dev[0].global_request("SET p2p_group_idle 0") 4925 4926def _test_dbus_p2p_group_idle_timeout(dev, apdev): 4927 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 4928 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 4929 addr0 = dev[0].p2p_dev_addr() 4930 dev[1].p2p_listen() 4931 4932 class TestDbusP2p(TestDbus): 4933 def __init__(self, bus): 4934 TestDbus.__init__(self, bus) 4935 self.done = False 4936 self.group_started = False 4937 self.peer_group_added = False 4938 self.peer_group_removed = False 4939 4940 def __enter__(self): 4941 gobject.timeout_add(1, self.run_test) 4942 gobject.timeout_add(15000, self.timeout) 4943 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 4944 "DeviceFound") 4945 self.add_signal(self.goNegotiationSuccess, 4946 WPAS_DBUS_IFACE_P2PDEVICE, 4947 "GONegotiationSuccess", 4948 byte_arrays=True) 4949 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 4950 "GroupStarted") 4951 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 4952 "GroupFinished") 4953 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 4954 "PropertiesChanged") 4955 self.loop.run() 4956 return self 4957 4958 def deviceFound(self, path): 4959 logger.debug("deviceFound: path=%s" % path) 4960 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 4961 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 4962 'go_intent': 0} 4963 p2p.Connect(args) 4964 4965 ev = dev1.wait_global_event(["P2P-GO-NEG-REQUEST"], timeout=15) 4966 if ev is None: 4967 raise Exception("Timeout while waiting for GO Neg Request") 4968 dev1.global_request("P2P_CONNECT " + addr0 + " 12345670 display go_intent=15") 4969 ev = dev1.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 4970 if ev is None: 4971 raise Exception("Group formation timed out") 4972 self.sta_group_ev = ev 4973 dev1.close_monitor_global() 4974 dev1.close_monitor_mon() 4975 dev1 = None 4976 4977 def goNegotiationSuccess(self, properties): 4978 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 4979 4980 def groupStarted(self, properties): 4981 logger.debug("groupStarted: " + str(properties)) 4982 self.group_started = True 4983 g_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 4984 properties['interface_object']) 4985 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1', monitor=False) 4986 dev1.group_form_result(self.sta_group_ev) 4987 ifaddr = dev1.group_request("STA-FIRST").splitlines()[0] 4988 # Force disassociation with different reason code so that the 4989 # P2P Client using D-Bus does not get normal group termination event 4990 # from the GO. 4991 dev1.group_request("DEAUTHENTICATE " + ifaddr + " reason=0 test=0") 4992 dev1.remove_group() 4993 4994 def groupFinished(self, properties): 4995 logger.debug("groupFinished: " + str(properties)) 4996 self.done = True 4997 4998 def propertiesChanged(self, interface_name, changed_properties, 4999 invalidated_properties): 5000 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5001 if interface_name != WPAS_DBUS_P2P_PEER: 5002 return 5003 if not self.group_started: 5004 return 5005 if "Groups" not in changed_properties: 5006 return 5007 if len(changed_properties["Groups"]) > 0: 5008 self.peer_group_added = True 5009 if len(changed_properties["Groups"]) == 0: 5010 self.peer_group_removed = True 5011 self.loop.quit() 5012 5013 def run_test(self, *args): 5014 logger.debug("run_test") 5015 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5016 return False 5017 5018 def success(self): 5019 return self.done and self.peer_group_added and self.peer_group_removed 5020 5021 with TestDbusP2p(bus) as t: 5022 if not t.success(): 5023 raise Exception("Expected signals not seen") 5024 5025def test_dbus_p2p_wps_failure(dev, apdev): 5026 """D-Bus P2P WPS failure""" 5027 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5028 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5029 addr0 = dev[0].p2p_dev_addr() 5030 5031 class TestDbusP2p(TestDbus): 5032 def __init__(self, bus): 5033 TestDbus.__init__(self, bus) 5034 self.wps_failed = False 5035 self.formation_failure = False 5036 5037 def __enter__(self): 5038 gobject.timeout_add(1, self.run_test) 5039 gobject.timeout_add(15000, self.timeout) 5040 self.add_signal(self.goNegotiationRequest, 5041 WPAS_DBUS_IFACE_P2PDEVICE, 5042 "GONegotiationRequest", 5043 byte_arrays=True) 5044 self.add_signal(self.goNegotiationSuccess, 5045 WPAS_DBUS_IFACE_P2PDEVICE, 5046 "GONegotiationSuccess", 5047 byte_arrays=True) 5048 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5049 "GroupStarted") 5050 self.add_signal(self.wpsFailed, WPAS_DBUS_IFACE_P2PDEVICE, 5051 "WpsFailed") 5052 self.add_signal(self.groupFormationFailure, 5053 WPAS_DBUS_IFACE_P2PDEVICE, 5054 "GroupFormationFailure") 5055 self.loop.run() 5056 return self 5057 5058 def goNegotiationRequest(self, path, dev_passwd_id, go_intent=0): 5059 logger.debug("goNegotiationRequest: path=%s dev_passwd_id=%d go_intent=%d" % (path, dev_passwd_id, go_intent)) 5060 if dev_passwd_id != 1: 5061 raise Exception("Unexpected dev_passwd_id=%d" % dev_passwd_id) 5062 args = {'peer': path, 'wps_method': 'display', 'pin': '12345670', 5063 'go_intent': 15} 5064 p2p.Connect(args) 5065 5066 def goNegotiationSuccess(self, properties): 5067 logger.debug("goNegotiationSuccess: properties=%s" % str(properties)) 5068 5069 def groupStarted(self, properties): 5070 logger.debug("groupStarted: " + str(properties)) 5071 raise Exception("Unexpected GroupStarted") 5072 5073 def wpsFailed(self, name, args): 5074 logger.debug("wpsFailed - name=%s args=%s" % (name, str(args))) 5075 self.wps_failed = True 5076 if self.formation_failure: 5077 self.loop.quit() 5078 5079 def groupFormationFailure(self, reason): 5080 logger.debug("groupFormationFailure - reason=%s" % reason) 5081 self.formation_failure = True 5082 if self.wps_failed: 5083 self.loop.quit() 5084 5085 def run_test(self, *args): 5086 logger.debug("run_test") 5087 p2p.Listen(10) 5088 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5089 if not dev1.discover_peer(addr0): 5090 raise Exception("Peer not found") 5091 dev1.global_request("P2P_CONNECT " + addr0 + " 87654321 enter") 5092 return False 5093 5094 def success(self): 5095 return self.wps_failed and self.formation_failure 5096 5097 with TestDbusP2p(bus) as t: 5098 if not t.success(): 5099 raise Exception("Expected signals not seen") 5100 5101def test_dbus_p2p_two_groups(dev, apdev): 5102 """D-Bus P2P with two concurrent groups""" 5103 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5104 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5105 5106 dev[0].request("SET p2p_no_group_iface 0") 5107 addr0 = dev[0].p2p_dev_addr() 5108 addr1 = dev[1].p2p_dev_addr() 5109 addr2 = dev[2].p2p_dev_addr() 5110 dev[1].p2p_start_go(freq=2412) 5111 dev1_group_ifname = dev[1].group_ifname 5112 5113 class TestDbusP2p(TestDbus): 5114 def __init__(self, bus): 5115 TestDbus.__init__(self, bus) 5116 self.done = False 5117 self.peer = None 5118 self.go = None 5119 self.group1 = None 5120 self.group2 = None 5121 self.groups_removed = False 5122 5123 def __enter__(self): 5124 gobject.timeout_add(1, self.run_test) 5125 gobject.timeout_add(15000, self.timeout) 5126 self.add_signal(self.propertiesChanged, dbus.PROPERTIES_IFACE, 5127 "PropertiesChanged", byte_arrays=True) 5128 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5129 "DeviceFound") 5130 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5131 "GroupStarted") 5132 self.add_signal(self.groupFinished, WPAS_DBUS_IFACE_P2PDEVICE, 5133 "GroupFinished") 5134 self.add_signal(self.peerJoined, WPAS_DBUS_GROUP, 5135 "PeerJoined") 5136 self.loop.run() 5137 return self 5138 5139 def propertiesChanged(self, interface_name, changed_properties, 5140 invalidated_properties): 5141 logger.debug("propertiesChanged: interface_name=%s changed_properties=%s invalidated_properties=%s" % (interface_name, str(changed_properties), str(invalidated_properties))) 5142 5143 def deviceFound(self, path): 5144 logger.debug("deviceFound: path=%s" % path) 5145 if addr2.replace(':', '') in path: 5146 self.peer = path 5147 elif addr1.replace(':', '') in path: 5148 self.go = path 5149 if self.go and not self.group1: 5150 logger.info("Join the group") 5151 p2p.StopFind() 5152 pin = '12345670' 5153 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5154 dev1.group_ifname = dev1_group_ifname 5155 dev1.group_request("WPS_PIN any " + pin) 5156 args = {'peer': self.go, 5157 'join': True, 5158 'wps_method': 'pin', 5159 'pin': pin, 5160 'frequency': 2412} 5161 p2p.Connect(args) 5162 5163 def groupStarted(self, properties): 5164 logger.debug("groupStarted: " + str(properties)) 5165 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5166 dbus_interface=dbus.PROPERTIES_IFACE) 5167 logger.debug("p2pdevice properties: " + str(prop)) 5168 5169 g_obj = bus.get_object(WPAS_DBUS_SERVICE, 5170 properties['group_object']) 5171 res = g_obj.GetAll(WPAS_DBUS_GROUP, 5172 dbus_interface=dbus.PROPERTIES_IFACE, 5173 byte_arrays=True) 5174 logger.debug("Group properties: " + str(res)) 5175 5176 if not self.group1: 5177 self.group1 = properties['group_object'] 5178 self.group1iface = properties['interface_object'] 5179 self.g1_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5180 self.group1iface) 5181 5182 logger.info("Start autonomous GO") 5183 params = dbus.Dictionary({'frequency': 2412}) 5184 p2p.GroupAdd(params) 5185 elif not self.group2: 5186 self.group2 = properties['group_object'] 5187 self.group2iface = properties['interface_object'] 5188 self.g2_if_obj = bus.get_object(WPAS_DBUS_SERVICE, 5189 self.group2iface) 5190 self.g2_bssid = res['BSSID'] 5191 5192 if self.group1 and self.group2: 5193 logger.info("Authorize peer to join the group") 5194 a2 = binascii.unhexlify(addr2.replace(':', '')) 5195 params = {'Role': 'enrollee', 5196 'P2PDeviceAddress': dbus.ByteArray(a2), 5197 'Bssid': dbus.ByteArray(a2), 5198 'Type': 'pin', 5199 'Pin': '12345670'} 5200 g_wps = dbus.Interface(self.g2_if_obj, WPAS_DBUS_IFACE_WPS) 5201 g_wps.Start(params) 5202 5203 bssid = ':'.join(["%02x" % i for i in struct.unpack('6B', self.g2_bssid)]) 5204 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2') 5205 dev2.scan_for_bss(bssid, freq=2412) 5206 dev2.global_request("P2P_CONNECT " + bssid + " 12345670 join freq=2412") 5207 ev = dev2.wait_global_event(["P2P-GROUP-STARTED"], timeout=15) 5208 if ev is None: 5209 raise Exception("Group join timed out") 5210 self.dev2_group_ev = ev 5211 5212 def groupFinished(self, properties): 5213 logger.debug("groupFinished: " + str(properties)) 5214 5215 if self.group1 == properties['group_object']: 5216 self.group1 = None 5217 elif self.group2 == properties['group_object']: 5218 self.group2 = None 5219 5220 if not self.group1 and not self.group2: 5221 self.done = True 5222 self.loop.quit() 5223 5224 def peerJoined(self, peer): 5225 logger.debug("peerJoined: " + peer) 5226 if self.groups_removed: 5227 return 5228 self.check_results() 5229 5230 dev2 = WpaSupplicant('wlan2', '/tmp/wpas-wlan2') 5231 dev2.group_form_result(self.dev2_group_ev) 5232 dev2.remove_group() 5233 5234 logger.info("Disconnect group2") 5235 group_p2p = dbus.Interface(self.g2_if_obj, 5236 WPAS_DBUS_IFACE_P2PDEVICE) 5237 group_p2p.Disconnect() 5238 5239 logger.info("Disconnect group1") 5240 group_p2p = dbus.Interface(self.g1_if_obj, 5241 WPAS_DBUS_IFACE_P2PDEVICE) 5242 group_p2p.Disconnect() 5243 self.groups_removed = True 5244 5245 def check_results(self): 5246 logger.info("Check results with two concurrent groups in operation") 5247 5248 g1_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group1) 5249 res1 = g1_obj.GetAll(WPAS_DBUS_GROUP, 5250 dbus_interface=dbus.PROPERTIES_IFACE, 5251 byte_arrays=True) 5252 5253 g2_obj = bus.get_object(WPAS_DBUS_SERVICE, self.group2) 5254 res2 = g2_obj.GetAll(WPAS_DBUS_GROUP, 5255 dbus_interface=dbus.PROPERTIES_IFACE, 5256 byte_arrays=True) 5257 5258 logger.info("group1 = " + self.group1) 5259 logger.debug("Group properties: " + str(res1)) 5260 5261 logger.info("group2 = " + self.group2) 5262 logger.debug("Group properties: " + str(res2)) 5263 5264 prop = if_obj.GetAll(WPAS_DBUS_IFACE_P2PDEVICE, 5265 dbus_interface=dbus.PROPERTIES_IFACE) 5266 logger.debug("p2pdevice properties: " + str(prop)) 5267 5268 if res1['Role'] != 'client': 5269 raise Exception("Group1 role reported incorrectly: " + res1['Role']) 5270 if res2['Role'] != 'GO': 5271 raise Exception("Group2 role reported incorrectly: " + res2['Role']) 5272 if prop['Role'] != 'device': 5273 raise Exception("p2pdevice role reported incorrectly: " + prop['Role']) 5274 5275 if len(res2['Members']) != 1: 5276 raise Exception("Unexpected Members value for group 2") 5277 5278 def run_test(self, *args): 5279 logger.debug("run_test") 5280 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5281 return False 5282 5283 def success(self): 5284 return self.done 5285 5286 with TestDbusP2p(bus) as t: 5287 if not t.success(): 5288 raise Exception("Expected signals not seen") 5289 5290 dev[1].remove_group() 5291 5292def test_dbus_p2p_cancel(dev, apdev): 5293 """D-Bus P2P Cancel""" 5294 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5295 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5296 try: 5297 p2p.Cancel() 5298 raise Exception("Unexpected p2p.Cancel() success") 5299 except dbus.exceptions.DBusException as e: 5300 pass 5301 5302 addr0 = dev[0].p2p_dev_addr() 5303 dev[1].p2p_listen() 5304 5305 class TestDbusP2p(TestDbus): 5306 def __init__(self, bus): 5307 TestDbus.__init__(self, bus) 5308 self.done = False 5309 5310 def __enter__(self): 5311 gobject.timeout_add(1, self.run_test) 5312 gobject.timeout_add(15000, self.timeout) 5313 self.add_signal(self.deviceFound, WPAS_DBUS_IFACE_P2PDEVICE, 5314 "DeviceFound") 5315 self.loop.run() 5316 return self 5317 5318 def deviceFound(self, path): 5319 logger.debug("deviceFound: path=%s" % path) 5320 args = {'peer': path, 'wps_method': 'keypad', 'pin': '12345670', 5321 'go_intent': 0} 5322 p2p.Connect(args) 5323 p2p.Cancel() 5324 self.done = True 5325 self.loop.quit() 5326 5327 def run_test(self, *args): 5328 logger.debug("run_test") 5329 p2p.Find(dbus.Dictionary({'DiscoveryType': 'social'})) 5330 return False 5331 5332 def success(self): 5333 return self.done 5334 5335 with TestDbusP2p(bus) as t: 5336 if not t.success(): 5337 raise Exception("Expected signals not seen") 5338 5339def test_dbus_p2p_ip_addr(dev, apdev): 5340 """D-Bus P2P and IP address parameters""" 5341 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5342 p2p = dbus.Interface(if_obj, WPAS_DBUS_IFACE_P2PDEVICE) 5343 5344 vals = [("IpAddrGo", "192.168.43.1"), 5345 ("IpAddrMask", "255.255.255.0"), 5346 ("IpAddrStart", "192.168.43.100"), 5347 ("IpAddrEnd", "192.168.43.199")] 5348 for field, value in vals: 5349 if_obj.Set(WPAS_DBUS_IFACE, field, value, 5350 dbus_interface=dbus.PROPERTIES_IFACE) 5351 val = if_obj.Get(WPAS_DBUS_IFACE, field, 5352 dbus_interface=dbus.PROPERTIES_IFACE) 5353 if val != value: 5354 raise Exception("Unexpected %s value: %s" % (field, val)) 5355 5356 set_ip_addr_info(dev[1]) 5357 5358 dev[0].global_request("SET p2p_go_intent 0") 5359 5360 req = dev[0].global_request("NFC_GET_HANDOVER_REQ NDEF P2P-CR").rstrip() 5361 if "FAIL" in req: 5362 raise Exception("Failed to generate NFC connection handover request") 5363 sel = dev[1].global_request("NFC_GET_HANDOVER_SEL NDEF P2P-CR").rstrip() 5364 if "FAIL" in sel: 5365 raise Exception("Failed to generate NFC connection handover select") 5366 dev[0].dump_monitor() 5367 dev[1].dump_monitor() 5368 res = dev[1].global_request("NFC_REPORT_HANDOVER RESP P2P " + req + " " + sel) 5369 if "FAIL" in res: 5370 raise Exception("Failed to report NFC connection handover to wpa_supplicant(resp)") 5371 res = dev[0].global_request("NFC_REPORT_HANDOVER INIT P2P " + req + " " + sel) 5372 if "FAIL" in res: 5373 raise Exception("Failed to report NFC connection handover to wpa_supplicant(init)") 5374 5375 class TestDbusP2p(TestDbus): 5376 def __init__(self, bus): 5377 TestDbus.__init__(self, bus) 5378 self.done = False 5379 5380 def __enter__(self): 5381 gobject.timeout_add(1, self.run_test) 5382 gobject.timeout_add(15000, self.timeout) 5383 self.add_signal(self.groupStarted, WPAS_DBUS_IFACE_P2PDEVICE, 5384 "GroupStarted") 5385 self.loop.run() 5386 return self 5387 5388 def groupStarted(self, properties): 5389 logger.debug("groupStarted: " + str(properties)) 5390 self.loop.quit() 5391 5392 if 'IpAddrGo' not in properties: 5393 logger.info("IpAddrGo missing from GroupStarted") 5394 ip_addr_go = properties['IpAddrGo'] 5395 addr = "%d.%d.%d.%d" % (ip_addr_go[0], ip_addr_go[1], ip_addr_go[2], ip_addr_go[3]) 5396 if addr != "192.168.42.1": 5397 logger.info("Unexpected IpAddrGo value: " + addr) 5398 self.done = True 5399 5400 def run_test(self, *args): 5401 logger.debug("run_test") 5402 return False 5403 5404 def success(self): 5405 return self.done 5406 5407 with TestDbusP2p(bus) as t: 5408 if not t.success(): 5409 raise Exception("Expected signals not seen") 5410 5411def test_dbus_introspect(dev, apdev): 5412 """D-Bus introspection""" 5413 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5414 5415 res = if_obj.Introspect(WPAS_DBUS_IFACE, 5416 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5417 logger.info("Initial Introspect: " + str(res)) 5418 if res is None or "Introspectable" not in res or "GroupStarted" not in res: 5419 raise Exception("Unexpected initial Introspect response: " + str(res)) 5420 if "FastReauth" not in res or "PassiveScan" not in res: 5421 raise Exception("Unexpected initial Introspect response: " + str(res)) 5422 5423 with alloc_fail(dev[0], 1, "wpa_dbus_introspect"): 5424 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5425 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5426 logger.info("Introspect: " + str(res2)) 5427 if res2 is not None: 5428 raise Exception("Unexpected Introspect response") 5429 5430 with alloc_fail(dev[0], 1, "=add_interface;wpa_dbus_introspect"): 5431 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5432 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5433 logger.info("Introspect: " + str(res2)) 5434 if res2 is None: 5435 raise Exception("No Introspect response") 5436 if len(res2) >= len(res): 5437 raise Exception("Unexpected Introspect response") 5438 5439 with alloc_fail(dev[0], 1, "wpabuf_alloc;add_interface;wpa_dbus_introspect"): 5440 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5441 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5442 logger.info("Introspect: " + str(res2)) 5443 if res2 is None: 5444 raise Exception("No Introspect response") 5445 if len(res2) >= len(res): 5446 raise Exception("Unexpected Introspect response") 5447 5448 with alloc_fail(dev[0], 2, "=add_interface;wpa_dbus_introspect"): 5449 res2 = if_obj.Introspect(WPAS_DBUS_IFACE, 5450 dbus_interface=dbus.INTROSPECTABLE_IFACE) 5451 logger.info("Introspect: " + str(res2)) 5452 if res2 is None: 5453 raise Exception("No Introspect response") 5454 if len(res2) >= len(res): 5455 raise Exception("Unexpected Introspect response") 5456 5457def run_busctl(service, obj): 5458 if not shutil.which("busctl"): 5459 raise HwsimSkip("No busctl available") 5460 logger.info("busctl introspect %s %s" % (service, obj)) 5461 cmd = subprocess.Popen(['busctl', 'introspect', service, obj], 5462 stdout=subprocess.PIPE, 5463 stderr=subprocess.PIPE) 5464 out = cmd.communicate() 5465 cmd.wait() 5466 logger.info("busctl stdout:\n%s" % out[0].strip()) 5467 if len(out[1]) > 0: 5468 logger.info("busctl stderr: %s" % out[1].decode().strip()) 5469 if "Duplicate property" in out[1].decode(): 5470 raise Exception("Duplicate property") 5471 5472def test_dbus_introspect_busctl(dev, apdev): 5473 """D-Bus introspection with busctl""" 5474 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5475 ifaces = dbus_get(dbus, wpas_obj, "Interfaces") 5476 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH) 5477 run_busctl(WPAS_DBUS_SERVICE, WPAS_DBUS_PATH + "/Interfaces") 5478 run_busctl(WPAS_DBUS_SERVICE, ifaces[0]) 5479 5480 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5481 bssid = apdev[0]['bssid'] 5482 dev[0].scan_for_bss(bssid, freq=2412) 5483 id = dev[0].add_network() 5484 dev[0].set_network(id, "disabled", "0") 5485 dev[0].set_network_quoted(id, "ssid", "test") 5486 5487 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/BSSs/0") 5488 run_busctl(WPAS_DBUS_SERVICE, ifaces[0] + "/Networks/0") 5489 5490def test_dbus_ap(dev, apdev): 5491 """D-Bus AddNetwork for AP mode""" 5492 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5493 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5494 5495 ssid = "test-wpa2-psk" 5496 passphrase = 'qwertyuiop' 5497 5498 class TestDbusConnect(TestDbus): 5499 def __init__(self, bus): 5500 TestDbus.__init__(self, bus) 5501 self.started = False 5502 self.sta_added = False 5503 self.sta_removed = False 5504 self.authorized = False 5505 self.deauthorized = False 5506 self.stations = False 5507 5508 def __enter__(self): 5509 gobject.timeout_add(1, self.run_connect) 5510 gobject.timeout_add(15000, self.timeout) 5511 self.add_signal(self.networkAdded, WPAS_DBUS_IFACE, "NetworkAdded") 5512 self.add_signal(self.networkSelected, WPAS_DBUS_IFACE, 5513 "NetworkSelected") 5514 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5515 "PropertiesChanged") 5516 self.add_signal(self.stationAdded, WPAS_DBUS_IFACE, "StationAdded") 5517 self.add_signal(self.stationRemoved, WPAS_DBUS_IFACE, 5518 "StationRemoved") 5519 self.add_signal(self.staAuthorized, WPAS_DBUS_IFACE, 5520 "StaAuthorized") 5521 self.add_signal(self.staDeauthorized, WPAS_DBUS_IFACE, 5522 "StaDeauthorized") 5523 self.loop.run() 5524 return self 5525 5526 def networkAdded(self, network, properties): 5527 logger.debug("networkAdded: %s" % str(network)) 5528 logger.debug(str(properties)) 5529 5530 def networkSelected(self, network): 5531 logger.debug("networkSelected: %s" % str(network)) 5532 self.network_selected = True 5533 5534 def propertiesChanged(self, properties): 5535 logger.debug("propertiesChanged: %s" % str(properties)) 5536 if 'State' in properties and properties['State'] == "completed": 5537 self.started = True 5538 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5539 dev1.connect(ssid, psk=passphrase, scan_freq="2412") 5540 5541 def stationAdded(self, station, properties): 5542 logger.debug("stationAdded: %s" % str(station)) 5543 logger.debug(str(properties)) 5544 self.sta_added = True 5545 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5546 dbus_interface=dbus.PROPERTIES_IFACE) 5547 logger.info("Stations: " + str(res)) 5548 if len(res) == 1: 5549 self.stations = True 5550 else: 5551 raise Exception("Missing Stations entry: " + str(res)) 5552 5553 def stationRemoved(self, station): 5554 logger.debug("stationRemoved: %s" % str(station)) 5555 self.sta_removed = True 5556 res = if_obj.Get(WPAS_DBUS_IFACE, 'Stations', 5557 dbus_interface=dbus.PROPERTIES_IFACE) 5558 logger.info("Stations: " + str(res)) 5559 if len(res) != 0: 5560 self.stations = False 5561 raise Exception("Unexpected Stations entry: " + str(res)) 5562 self.loop.quit() 5563 5564 def staAuthorized(self, name): 5565 logger.debug("staAuthorized: " + name) 5566 self.authorized = True 5567 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 5568 dev1.request("DISCONNECT") 5569 5570 def staDeauthorized(self, name): 5571 logger.debug("staDeauthorized: " + name) 5572 self.deauthorized = True 5573 5574 def run_connect(self, *args): 5575 logger.debug("run_connect") 5576 args = dbus.Dictionary({'ssid': ssid, 5577 'key_mgmt': 'WPA-PSK', 5578 'psk': passphrase, 5579 'mode': 2, 5580 'frequency': 2412, 5581 'scan_freq': 2412}, 5582 signature='sv') 5583 self.netw = iface.AddNetwork(args) 5584 iface.SelectNetwork(self.netw) 5585 return False 5586 5587 def success(self): 5588 return self.started and self.sta_added and self.sta_removed and \ 5589 self.authorized and self.deauthorized 5590 5591 with TestDbusConnect(bus) as t: 5592 if not t.success(): 5593 raise Exception("Expected signals not seen") 5594 5595def test_dbus_ap_scan(dev, apdev): 5596 """D-Bus AddNetwork for AP mode and scan""" 5597 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5598 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5599 5600 ssid = "test-wpa2-psk" 5601 passphrase = 'qwertyuiop' 5602 5603 hapd = hostapd.add_ap(apdev[0], {"ssid": "open"}) 5604 bssid = hapd.own_addr() 5605 5606 class TestDbusConnect(TestDbus): 5607 def __init__(self, bus): 5608 TestDbus.__init__(self, bus) 5609 self.started = False 5610 self.scan_completed = False 5611 5612 def __enter__(self): 5613 gobject.timeout_add(1, self.run_connect) 5614 gobject.timeout_add(15000, self.timeout) 5615 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5616 "PropertiesChanged") 5617 self.add_signal(self.scanDone, WPAS_DBUS_IFACE, "ScanDone") 5618 self.loop.run() 5619 return self 5620 5621 def propertiesChanged(self, properties): 5622 logger.debug("propertiesChanged: %s" % str(properties)) 5623 if 'State' in properties and properties['State'] == "completed": 5624 self.started = True 5625 logger.info("Try to scan in AP mode") 5626 iface.Scan({'Type': 'active', 5627 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5628 logger.info("Scan() returned") 5629 5630 def scanDone(self, success): 5631 logger.debug("scanDone: success=%s" % success) 5632 if self.started: 5633 self.scan_completed = True 5634 self.loop.quit() 5635 5636 def run_connect(self, *args): 5637 logger.debug("run_connect") 5638 args = dbus.Dictionary({'ssid': ssid, 5639 'key_mgmt': 'WPA-PSK', 5640 'psk': passphrase, 5641 'mode': 2, 5642 'frequency': 2412, 5643 'scan_freq': 2412}, 5644 signature='sv') 5645 self.netw = iface.AddNetwork(args) 5646 iface.SelectNetwork(self.netw) 5647 return False 5648 5649 def success(self): 5650 return self.started and self.scan_completed 5651 5652 with TestDbusConnect(bus) as t: 5653 if not t.success(): 5654 raise Exception("Expected signals not seen") 5655 5656def test_dbus_connect_wpa_eap(dev, apdev): 5657 """D-Bus AddNetwork and connection with WPA+WPA2-Enterprise AP""" 5658 skip_without_tkip(dev[0]) 5659 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5660 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5661 5662 ssid = "test-wpa-eap" 5663 params = hostapd.wpa_eap_params(ssid=ssid) 5664 params["wpa"] = "3" 5665 params["rsn_pairwise"] = "CCMP" 5666 hapd = hostapd.add_ap(apdev[0], params) 5667 5668 class TestDbusConnect(TestDbus): 5669 def __init__(self, bus): 5670 TestDbus.__init__(self, bus) 5671 self.done = False 5672 5673 def __enter__(self): 5674 gobject.timeout_add(1, self.run_connect) 5675 gobject.timeout_add(15000, self.timeout) 5676 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5677 "PropertiesChanged") 5678 self.add_signal(self.eap, WPAS_DBUS_IFACE, "EAP") 5679 self.loop.run() 5680 return self 5681 5682 def propertiesChanged(self, properties): 5683 logger.debug("propertiesChanged: %s" % str(properties)) 5684 if 'State' in properties and properties['State'] == "completed": 5685 self.done = True 5686 self.loop.quit() 5687 5688 def eap(self, status, parameter): 5689 logger.debug("EAP: status=%s parameter=%s" % (status, parameter)) 5690 5691 def run_connect(self, *args): 5692 logger.debug("run_connect") 5693 args = dbus.Dictionary({'ssid': ssid, 5694 'key_mgmt': 'WPA-EAP', 5695 'eap': 'PEAP', 5696 'identity': 'user', 5697 'password': 'password', 5698 'ca_cert': 'auth_serv/ca.pem', 5699 'phase2': 'auth=MSCHAPV2', 5700 'scan_freq': 2412}, 5701 signature='sv') 5702 self.netw = iface.AddNetwork(args) 5703 iface.SelectNetwork(self.netw) 5704 return False 5705 5706 def success(self): 5707 return self.done 5708 5709 with TestDbusConnect(bus) as t: 5710 if not t.success(): 5711 raise Exception("Expected signals not seen") 5712 5713def test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5714 """AP_SCAN 2 AP mode and D-Bus Scan()""" 5715 try: 5716 _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev) 5717 finally: 5718 dev[0].request("AP_SCAN 1") 5719 5720def _test_dbus_ap_scan_2_ap_mode_scan(dev, apdev): 5721 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5722 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5723 5724 if "OK" not in dev[0].request("AP_SCAN 2"): 5725 raise Exception("Failed to set AP_SCAN 2") 5726 5727 id = dev[0].add_network() 5728 dev[0].set_network(id, "mode", "2") 5729 dev[0].set_network_quoted(id, "ssid", "wpas-ap-open") 5730 dev[0].set_network(id, "key_mgmt", "NONE") 5731 dev[0].set_network(id, "frequency", "2412") 5732 dev[0].set_network(id, "scan_freq", "2412") 5733 dev[0].set_network(id, "disabled", "0") 5734 dev[0].select_network(id) 5735 ev = dev[0].wait_event(["CTRL-EVENT-CONNECTED"], timeout=5) 5736 if ev is None: 5737 raise Exception("AP failed to start") 5738 5739 with fail_test(dev[0], 1, "wpa_driver_nl80211_scan"): 5740 iface.Scan({'Type': 'active', 5741 'AllowRoam': True, 5742 'Channels': [(dbus.UInt32(2412), dbus.UInt32(20))]}) 5743 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5744 "AP-DISABLED"], timeout=5) 5745 if ev is None: 5746 raise Exception("CTRL-EVENT-SCAN-FAILED not seen") 5747 if "AP-DISABLED" in ev: 5748 raise Exception("Unexpected AP-DISABLED event") 5749 if "retry=1" in ev: 5750 # Wait for the retry to scan happen 5751 ev = dev[0].wait_event(["CTRL-EVENT-SCAN-FAILED", 5752 "AP-DISABLED"], timeout=5) 5753 if ev is None: 5754 raise Exception("CTRL-EVENT-SCAN-FAILED not seen - retry") 5755 if "AP-DISABLED" in ev: 5756 raise Exception("Unexpected AP-DISABLED event - retry") 5757 5758 dev[1].connect("wpas-ap-open", key_mgmt="NONE", scan_freq="2412") 5759 dev[1].request("DISCONNECT") 5760 dev[1].wait_disconnected() 5761 dev[0].request("DISCONNECT") 5762 dev[0].wait_disconnected() 5763 5764def test_dbus_expectdisconnect(dev, apdev): 5765 """D-Bus ExpectDisconnect""" 5766 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5767 wpas = dbus.Interface(wpas_obj, WPAS_DBUS_SERVICE) 5768 5769 params = {"ssid": "test-open"} 5770 hapd = hostapd.add_ap(apdev[0], params) 5771 dev[0].connect("test-open", key_mgmt="NONE", scan_freq="2412") 5772 5773 # This does not really verify the behavior other than by going through the 5774 # code path for additional coverage. 5775 wpas.ExpectDisconnect() 5776 dev[0].request("DISCONNECT") 5777 dev[0].wait_disconnected() 5778 5779def test_dbus_save_config(dev, apdev): 5780 """D-Bus SaveConfig""" 5781 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5782 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5783 try: 5784 iface.SaveConfig() 5785 raise Exception("SaveConfig() accepted unexpectedly") 5786 except dbus.exceptions.DBusException as e: 5787 if not str(e).startswith("fi.w1.wpa_supplicant1.UnknownError: Not allowed to update configuration"): 5788 raise Exception("Unexpected error message for SaveConfig(): " + str(e)) 5789 5790def test_dbus_vendor_elem(dev, apdev): 5791 """D-Bus vendor element operations""" 5792 try: 5793 _test_dbus_vendor_elem(dev, apdev) 5794 finally: 5795 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5796 5797def _test_dbus_vendor_elem(dev, apdev): 5798 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5799 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5800 5801 dev[0].request("VENDOR_ELEM_REMOVE 1 *") 5802 5803 try: 5804 ie = dbus.ByteArray(b"\x00\x00") 5805 iface.VendorElemAdd(-1, ie) 5806 raise Exception("Invalid VendorElemAdd() accepted") 5807 except dbus.exceptions.DBusException as e: 5808 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5809 raise Exception("Unexpected error message for invalid VendorElemAdd[1]: " + str(e)) 5810 5811 try: 5812 ie = dbus.ByteArray(b'') 5813 iface.VendorElemAdd(1, ie) 5814 raise Exception("Invalid VendorElemAdd() accepted") 5815 except dbus.exceptions.DBusException as e: 5816 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5817 raise Exception("Unexpected error message for invalid VendorElemAdd[2]: " + str(e)) 5818 5819 try: 5820 ie = dbus.ByteArray(b"\x00\x01") 5821 iface.VendorElemAdd(1, ie) 5822 raise Exception("Invalid VendorElemAdd() accepted") 5823 except dbus.exceptions.DBusException as e: 5824 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5825 raise Exception("Unexpected error message for invalid VendorElemAdd[3]: " + str(e)) 5826 5827 try: 5828 iface.VendorElemGet(-1) 5829 raise Exception("Invalid VendorElemGet() accepted") 5830 except dbus.exceptions.DBusException as e: 5831 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5832 raise Exception("Unexpected error message for invalid VendorElemGet[1]: " + str(e)) 5833 5834 try: 5835 iface.VendorElemGet(1) 5836 raise Exception("Invalid VendorElemGet() accepted") 5837 except dbus.exceptions.DBusException as e: 5838 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5839 raise Exception("Unexpected error message for invalid VendorElemGet[2]: " + str(e)) 5840 5841 try: 5842 ie = dbus.ByteArray(b"\x00\x00") 5843 iface.VendorElemRem(-1, ie) 5844 raise Exception("Invalid VendorElemRemove() accepted") 5845 except dbus.exceptions.DBusException as e: 5846 if "InvalidArgs" not in str(e) or "Invalid ID" not in str(e): 5847 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5848 5849 try: 5850 ie = dbus.ByteArray(b'') 5851 iface.VendorElemRem(1, ie) 5852 raise Exception("Invalid VendorElemRemove() accepted") 5853 except dbus.exceptions.DBusException as e: 5854 if "InvalidArgs" not in str(e) or "Invalid value" not in str(e): 5855 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5856 5857 iface.VendorElemRem(1, b"*") 5858 5859 ie = dbus.ByteArray(b"\x00\x01\x00") 5860 iface.VendorElemAdd(1, ie) 5861 5862 val = iface.VendorElemGet(1) 5863 if len(val) != len(ie): 5864 raise Exception("Unexpected VendorElemGet length") 5865 for i in range(len(val)): 5866 if val[i] != dbus.Byte(ie[i]): 5867 raise Exception("Unexpected VendorElemGet data") 5868 5869 ie2 = dbus.ByteArray(b"\xe0\x00") 5870 iface.VendorElemAdd(1, ie2) 5871 5872 ies = ie + ie2 5873 val = iface.VendorElemGet(1) 5874 if len(val) != len(ies): 5875 raise Exception("Unexpected VendorElemGet length[2]") 5876 for i in range(len(val)): 5877 if val[i] != dbus.Byte(ies[i]): 5878 raise Exception("Unexpected VendorElemGet data[2]") 5879 5880 try: 5881 test_ie = dbus.ByteArray(b"\x01\x01") 5882 iface.VendorElemRem(1, test_ie) 5883 raise Exception("Invalid VendorElemRemove() accepted") 5884 except dbus.exceptions.DBusException as e: 5885 if "InvalidArgs" not in str(e) or "Parse error" not in str(e): 5886 raise Exception("Unexpected error message for invalid VendorElemRemove[1]: " + str(e)) 5887 5888 iface.VendorElemRem(1, ie) 5889 val = iface.VendorElemGet(1) 5890 if len(val) != len(ie2): 5891 raise Exception("Unexpected VendorElemGet length[3]") 5892 5893 iface.VendorElemRem(1, b"*") 5894 try: 5895 iface.VendorElemGet(1) 5896 raise Exception("Invalid VendorElemGet() accepted after removal") 5897 except dbus.exceptions.DBusException as e: 5898 if "InvalidArgs" not in str(e) or "ID value does not exist" not in str(e): 5899 raise Exception("Unexpected error message for invalid VendorElemGet after removal: " + str(e)) 5900 5901def test_dbus_assoc_reject(dev, apdev): 5902 """D-Bus AssocStatusCode""" 5903 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5904 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 5905 5906 ssid = "test-open" 5907 params = {"ssid": ssid, 5908 "max_listen_interval": "1"} 5909 hapd = hostapd.add_ap(apdev[0], params) 5910 5911 class TestDbusConnect(TestDbus): 5912 def __init__(self, bus): 5913 TestDbus.__init__(self, bus) 5914 self.assoc_status_seen = False 5915 self.state = 0 5916 5917 def __enter__(self): 5918 gobject.timeout_add(1, self.run_connect) 5919 gobject.timeout_add(15000, self.timeout) 5920 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 5921 "PropertiesChanged") 5922 self.loop.run() 5923 return self 5924 5925 def propertiesChanged(self, properties): 5926 logger.debug("propertiesChanged: %s" % str(properties)) 5927 if 'AssocStatusCode' in properties: 5928 status = properties['AssocStatusCode'] 5929 if status != 51: 5930 logger.info("Unexpected status code: " + str(status)) 5931 else: 5932 self.assoc_status_seen = True 5933 iface.Disconnect() 5934 self.loop.quit() 5935 5936 def run_connect(self, *args): 5937 args = dbus.Dictionary({'ssid': ssid, 5938 'key_mgmt': 'NONE', 5939 'scan_freq': 2412}, 5940 signature='sv') 5941 self.netw = iface.AddNetwork(args) 5942 iface.SelectNetwork(self.netw) 5943 return False 5944 5945 def success(self): 5946 return self.assoc_status_seen 5947 5948 with TestDbusConnect(bus) as t: 5949 if not t.success(): 5950 raise Exception("Expected signals not seen") 5951 5952def test_dbus_mesh(dev, apdev): 5953 """D-Bus mesh""" 5954 check_mesh_support(dev[0]) 5955 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 5956 mesh = dbus.Interface(if_obj, WPAS_DBUS_IFACE_MESH) 5957 5958 add_open_mesh_network(dev[1]) 5959 addr1 = dev[1].own_addr() 5960 5961 class TestDbusMesh(TestDbus): 5962 def __init__(self, bus): 5963 TestDbus.__init__(self, bus) 5964 self.done = False 5965 5966 def __enter__(self): 5967 gobject.timeout_add(1, self.run_test) 5968 gobject.timeout_add(15000, self.timeout) 5969 self.add_signal(self.meshGroupStarted, WPAS_DBUS_IFACE_MESH, 5970 "MeshGroupStarted") 5971 self.add_signal(self.meshGroupRemoved, WPAS_DBUS_IFACE_MESH, 5972 "MeshGroupRemoved") 5973 self.add_signal(self.meshPeerConnected, WPAS_DBUS_IFACE_MESH, 5974 "MeshPeerConnected") 5975 self.add_signal(self.meshPeerDisconnected, WPAS_DBUS_IFACE_MESH, 5976 "MeshPeerDisconnected") 5977 self.loop.run() 5978 return self 5979 5980 def meshGroupStarted(self, args): 5981 logger.debug("MeshGroupStarted: " + str(args)) 5982 5983 def meshGroupRemoved(self, args): 5984 logger.debug("MeshGroupRemoved: " + str(args)) 5985 self.done = True 5986 self.loop.quit() 5987 5988 def meshPeerConnected(self, args): 5989 logger.debug("MeshPeerConnected: " + str(args)) 5990 5991 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshPeers', 5992 dbus_interface=dbus.PROPERTIES_IFACE, 5993 byte_arrays=True) 5994 logger.debug("MeshPeers: " + str(res)) 5995 if len(res) != 1: 5996 raise Exception("Unexpected number of MeshPeer values") 5997 if binascii.hexlify(res[0]).decode() != addr1.replace(':', ''): 5998 raise Exception("Unexpected peer address") 5999 6000 res = if_obj.Get(WPAS_DBUS_IFACE_MESH, 'MeshGroup', 6001 dbus_interface=dbus.PROPERTIES_IFACE, 6002 byte_arrays=True) 6003 logger.debug("MeshGroup: " + str(res)) 6004 if res != b"wpas-mesh-open": 6005 raise Exception("Unexpected MeshGroup") 6006 dev1 = WpaSupplicant('wlan1', '/tmp/wpas-wlan1') 6007 dev1.mesh_group_remove() 6008 6009 def meshPeerDisconnected(self, args): 6010 logger.debug("MeshPeerDisconnected: " + str(args)) 6011 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0') 6012 dev0.mesh_group_remove() 6013 6014 def run_test(self, *args): 6015 logger.debug("run_test") 6016 dev0 = WpaSupplicant('wlan0', '/tmp/wpas-wlan0') 6017 add_open_mesh_network(dev0) 6018 return False 6019 6020 def success(self): 6021 return self.done 6022 6023 with TestDbusMesh(bus) as t: 6024 if not t.success(): 6025 raise Exception("Expected signals not seen") 6026 6027def test_dbus_roam(dev, apdev): 6028 """D-Bus Roam""" 6029 (bus, wpas_obj, path, if_obj) = prepare_dbus(dev[0]) 6030 iface = dbus.Interface(if_obj, WPAS_DBUS_IFACE) 6031 6032 ssid = "test-wpa2-psk" 6033 passphrase = 'qwertyuiop' 6034 params = hostapd.wpa2_params(ssid=ssid, passphrase=passphrase) 6035 hapd = hostapd.add_ap(apdev[0], params) 6036 hapd2 = hostapd.add_ap(apdev[1], params) 6037 bssid = apdev[0]['bssid'] 6038 dev[0].scan_for_bss(bssid, freq=2412) 6039 bssid2 = apdev[1]['bssid'] 6040 dev[0].scan_for_bss(bssid2, freq=2412) 6041 6042 class TestDbusConnect(TestDbus): 6043 def __init__(self, bus): 6044 TestDbus.__init__(self, bus) 6045 self.state = 0 6046 6047 def __enter__(self): 6048 gobject.timeout_add(1, self.run_connect) 6049 gobject.timeout_add(15000, self.timeout) 6050 self.add_signal(self.propertiesChanged, WPAS_DBUS_IFACE, 6051 "PropertiesChanged") 6052 self.loop.run() 6053 return self 6054 6055 def propertiesChanged(self, properties): 6056 logger.debug("propertiesChanged: %s" % str(properties)) 6057 if 'State' in properties and properties['State'] == "completed": 6058 if self.state == 0: 6059 self.state = 1 6060 cur = properties["CurrentBSS"] 6061 bss_obj = bus.get_object(WPAS_DBUS_SERVICE, cur) 6062 res = bss_obj.Get(WPAS_DBUS_BSS, 'BSSID', 6063 dbus_interface=dbus.PROPERTIES_IFACE) 6064 bssid_str = '' 6065 for item in res: 6066 if len(bssid_str) > 0: 6067 bssid_str += ':' 6068 bssid_str += '%02x' % item 6069 dst = bssid if bssid_str == bssid2 else bssid2 6070 iface.Roam(dst) 6071 elif self.state == 1: 6072 if "RoamComplete" in properties and \ 6073 properties["RoamComplete"]: 6074 self.state = 2 6075 self.loop.quit() 6076 6077 def run_connect(self, *args): 6078 logger.debug("run_connect") 6079 args = dbus.Dictionary({'ssid': ssid, 6080 'key_mgmt': 'WPA-PSK', 6081 'psk': passphrase, 6082 'scan_freq': 2412}, 6083 signature='sv') 6084 self.netw = iface.AddNetwork(args) 6085 iface.SelectNetwork(self.netw) 6086 return False 6087 6088 def success(self): 6089 return self.state == 2 6090 6091 with TestDbusConnect(bus) as t: 6092 if not t.success(): 6093 raise Exception("Expected signals not seen") 6094