1# hwsim testing utilities 2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi> 3# 4# This software may be distributed under the terms of the BSD license. 5# See README for more details. 6 7import os 8import time 9import logging 10logger = logging.getLogger() 11 12from wpasupplicant import WpaSupplicant 13 14def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2): 15 cmd = "DATA_TEST_CONFIG 1" 16 if ifname1: 17 cmd = cmd + " ifname=" + ifname1 18 if dev1group: 19 res = dev1.group_request(cmd) 20 else: 21 res = dev1.request(cmd) 22 if "OK" not in res: 23 raise Exception("Failed to enable data test functionality") 24 25 cmd = "DATA_TEST_CONFIG 1" 26 if ifname2: 27 cmd = cmd + " ifname=" + ifname2 28 if dev2group: 29 res = dev2.group_request(cmd) 30 else: 31 res = dev2.request(cmd) 32 if "OK" not in res: 33 raise Exception("Failed to enable data test functionality") 34 35def run_multicast_connectivity_test(dev1, dev2, tos=None, 36 dev1group=False, dev2group=False, 37 ifname1=None, ifname2=None, 38 config=True, timeout=5, 39 send_len=None, multicast_to_unicast=False, 40 broadcast_retry_c=1): 41 addr1 = dev1.get_addr(dev1group) 42 addr2 = dev2.get_addr(dev2group) 43 44 if config: 45 config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) 46 47 cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos) 48 if send_len is not None: 49 cmd += " len=" + str(send_len) 50 for i in range(broadcast_retry_c): 51 try: 52 if dev1group: 53 dev1.group_request(cmd) 54 else: 55 dev1.request(cmd) 56 if dev2group: 57 ev = dev2.wait_group_event(["DATA-TEST-RX"], 58 timeout=timeout) 59 else: 60 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) 61 if ev is None: 62 raise Exception("dev1->dev2 broadcast data delivery failed") 63 if multicast_to_unicast: 64 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev: 65 raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing") 66 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: 67 raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)") 68 else: 69 if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev: 70 raise Exception("Unexpected dev1->dev2 broadcast data result") 71 if send_len is not None: 72 if " len=" + str(send_len) not in ev: 73 raise Exception("Unexpected dev1->dev2 broadcast data length") 74 else: 75 if " len=" in ev: 76 raise Exception("Unexpected dev1->dev2 broadcast data length") 77 break 78 except Exception as e: 79 if i == broadcast_retry_c - 1: 80 raise 81 82def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False, 83 ifname1=None, ifname2=None, config=True, timeout=5, 84 multicast_to_unicast=False, broadcast=True, 85 send_len=None): 86 addr1 = dev1.get_addr(dev1group) 87 addr2 = dev2.get_addr(dev2group) 88 89 dev1.dump_monitor() 90 dev2.dump_monitor() 91 92 if dev1.hostname is None and dev2.hostname is None: 93 broadcast_retry_c = 1 94 else: 95 broadcast_retry_c = 10 96 97 try: 98 if config: 99 config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2) 100 101 cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos) 102 if send_len is not None: 103 cmd += " len=" + str(send_len) 104 if dev1group: 105 dev1.group_request(cmd) 106 else: 107 dev1.request(cmd) 108 if dev2group: 109 ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout) 110 else: 111 ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout) 112 if ev is None: 113 raise Exception("dev1->dev2 unicast data delivery failed") 114 if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev: 115 raise Exception("Unexpected dev1->dev2 unicast data result") 116 if send_len is not None: 117 if " len=" + str(send_len) not in ev: 118 raise Exception("Unexpected dev1->dev2 unicast data length") 119 else: 120 if " len=" in ev: 121 raise Exception("Unexpected dev1->dev2 unicast data length") 122 123 if broadcast: 124 run_multicast_connectivity_test(dev1, dev2, tos, 125 dev1group, dev2group, 126 ifname1, ifname2, False, timeout, 127 send_len, False, broadcast_retry_c) 128 129 cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos) 130 if send_len is not None: 131 cmd += " len=" + str(send_len) 132 if dev2group: 133 dev2.group_request(cmd) 134 else: 135 dev2.request(cmd) 136 if dev1group: 137 ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout) 138 else: 139 ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout) 140 if ev is None: 141 raise Exception("dev2->dev1 unicast data delivery failed") 142 if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev: 143 raise Exception("Unexpected dev2->dev1 unicast data result") 144 if send_len is not None: 145 if " len=" + str(send_len) not in ev: 146 raise Exception("Unexpected dev2->dev1 unicast data length") 147 else: 148 if " len=" in ev: 149 raise Exception("Unexpected dev2->dev1 unicast data length") 150 151 if broadcast: 152 run_multicast_connectivity_test(dev2, dev1, tos, 153 dev2group, dev1group, 154 ifname2, ifname1, False, timeout, 155 send_len, multicast_to_unicast, 156 broadcast_retry_c) 157 158 finally: 159 if config: 160 if dev1group: 161 dev1.group_request("DATA_TEST_CONFIG 0") 162 else: 163 dev1.request("DATA_TEST_CONFIG 0") 164 if dev2group: 165 dev2.group_request("DATA_TEST_CONFIG 0") 166 else: 167 dev2.request("DATA_TEST_CONFIG 0") 168 169def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1, 170 dev1group=False, dev2group=False, 171 ifname1=None, ifname2=None, config=True, timeout=5, 172 multicast_to_unicast=False, success_expected=True, 173 broadcast=True, send_len=None): 174 if dscp: 175 tos = dscp << 2 176 if not tos: 177 tos = 0 178 179 success = False 180 last_err = None 181 for i in range(0, max_tries): 182 try: 183 run_connectivity_test(dev1, dev2, tos, dev1group, dev2group, 184 ifname1, ifname2, config=config, 185 timeout=timeout, 186 multicast_to_unicast=multicast_to_unicast, 187 broadcast=broadcast, send_len=send_len) 188 success = True 189 break 190 except Exception as e: 191 last_err = e 192 if i + 1 < max_tries: 193 time.sleep(1) 194 if success_expected and not success: 195 raise Exception(last_err) 196 if not success_expected and success: 197 raise Exception("Unexpected connectivity detected") 198 199def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None, 200 max_tries=1, timeout=5): 201 test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname, 202 max_tries=max_tries, timeout=timeout) 203 204def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None): 205 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True) 206 207def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None): 208 test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False) 209 210def test_connectivity_sta(dev1, dev2, dscp=None, tos=None): 211 test_connectivity(dev1, dev2, dscp, tos) 212 213(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = list(range(4)) 214 215def set_powersave(dev, val): 216 phy = dev.get_driver_status_field("phyname") 217 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy 218 data = '%d' % val 219 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True) 220 if res != 0: 221 raise Exception("Failed to set power save for device") 222 223def set_group_map(dev, val): 224 phy = dev.get_driver_status_field("phyname") 225 fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy 226 data = '%d' % val 227 (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True) 228 if res != 0: 229 raise Exception("Failed to set group map for %s" % phy) 230 231def set_rx_rssi(dev, val): 232 """ 233 Configure signal strength when receiving transmitted frames. 234 mac80211_hwsim driver sets rssi to: TX power - 50 235 According to that set tx_power in order to get the desired RSSI. 236 Valid RSSI range: -50 to -30. 237 """ 238 tx_power = (val + 50) * 100 239 ifname = dev.get_driver_status_field("ifname") 240 (res, data) = dev.cmd_execute(['iw', ifname, 'set', 'txpower', 241 'fixed', str(tx_power)]) 242 if res != 0: 243 raise Exception("Failed to set RSSI to %d" % val) 244 245def reset_rx_rssi(dev): 246 set_rx_rssi(dev, -30) 247