1# hwsim testing utilities
2# Copyright (c) 2013-2014, Jouni Malinen <j@w1.fi>
3#
4# This software may be distributed under the terms of the BSD license.
5# See README for more details.
6
7import os
8import time
9import logging
10logger = logging.getLogger()
11
12from wpasupplicant import WpaSupplicant
13
14def config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2):
15    cmd = "DATA_TEST_CONFIG 1"
16    if ifname1:
17        cmd = cmd + " ifname=" + ifname1
18    if dev1group:
19        res = dev1.group_request(cmd)
20    else:
21        res = dev1.request(cmd)
22    if "OK" not in res:
23        raise Exception("Failed to enable data test functionality")
24
25    cmd = "DATA_TEST_CONFIG 1"
26    if ifname2:
27        cmd = cmd + " ifname=" + ifname2
28    if dev2group:
29        res = dev2.group_request(cmd)
30    else:
31        res = dev2.request(cmd)
32    if "OK" not in res:
33        raise Exception("Failed to enable data test functionality")
34
35def run_multicast_connectivity_test(dev1, dev2, tos=None,
36                                    dev1group=False, dev2group=False,
37                                    ifname1=None, ifname2=None,
38                                    config=True, timeout=5,
39                                    send_len=None, multicast_to_unicast=False,
40                                    broadcast_retry_c=1):
41    addr1 = dev1.get_addr(dev1group)
42    addr2 = dev2.get_addr(dev2group)
43
44    if config:
45        config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
46
47    cmd = "DATA_TEST_TX ff:ff:ff:ff:ff:ff {} {}".format(addr1, tos)
48    if send_len is not None:
49        cmd += " len=" + str(send_len)
50    for i in range(broadcast_retry_c):
51        try:
52            if dev1group:
53                dev1.group_request(cmd)
54            else:
55                dev1.request(cmd)
56            if dev2group:
57                ev = dev2.wait_group_event(["DATA-TEST-RX"],
58                                           timeout=timeout)
59            else:
60                ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
61            if ev is None:
62                raise Exception("dev1->dev2 broadcast data delivery failed")
63            if multicast_to_unicast:
64                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) in ev:
65                    raise Exception("Unexpected dev1->dev2 broadcast data result: multicast to unicast conversion missing")
66                if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
67                    raise Exception("Unexpected dev1->dev2 broadcast data result (multicast to unicast enabled)")
68            else:
69                if "DATA-TEST-RX ff:ff:ff:ff:ff:ff {}".format(addr1) not in ev:
70                    raise Exception("Unexpected dev1->dev2 broadcast data result")
71            if send_len is not None:
72                if " len=" + str(send_len) not in ev:
73                    raise Exception("Unexpected dev1->dev2 broadcast data length")
74            else:
75                if " len=" in ev:
76                    raise Exception("Unexpected dev1->dev2 broadcast data length")
77            break
78        except Exception as e:
79            if i == broadcast_retry_c - 1:
80                raise
81
82def run_connectivity_test(dev1, dev2, tos, dev1group=False, dev2group=False,
83                          ifname1=None, ifname2=None, config=True, timeout=5,
84                          multicast_to_unicast=False, broadcast=True,
85                          send_len=None):
86    addr1 = dev1.get_addr(dev1group)
87    addr2 = dev2.get_addr(dev2group)
88
89    dev1.dump_monitor()
90    dev2.dump_monitor()
91
92    if dev1.hostname is None and dev2.hostname is None:
93        broadcast_retry_c = 1
94    else:
95        broadcast_retry_c = 10
96
97    try:
98        if config:
99            config_data_test(dev1, dev2, dev1group, dev2group, ifname1, ifname2)
100
101        cmd = "DATA_TEST_TX {} {} {}".format(addr2, addr1, tos)
102        if send_len is not None:
103            cmd += " len=" + str(send_len)
104        if dev1group:
105            dev1.group_request(cmd)
106        else:
107            dev1.request(cmd)
108        if dev2group:
109            ev = dev2.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
110        else:
111            ev = dev2.wait_event(["DATA-TEST-RX"], timeout=timeout)
112        if ev is None:
113            raise Exception("dev1->dev2 unicast data delivery failed")
114        if "DATA-TEST-RX {} {}".format(addr2, addr1) not in ev:
115            raise Exception("Unexpected dev1->dev2 unicast data result")
116        if send_len is not None:
117            if " len=" + str(send_len) not in ev:
118                raise Exception("Unexpected dev1->dev2 unicast data length")
119        else:
120            if " len=" in ev:
121                raise Exception("Unexpected dev1->dev2 unicast data length")
122
123        if broadcast:
124            run_multicast_connectivity_test(dev1, dev2, tos,
125                                            dev1group, dev2group,
126                                            ifname1, ifname2, False, timeout,
127                                            send_len, False, broadcast_retry_c)
128
129        cmd = "DATA_TEST_TX {} {} {}".format(addr1, addr2, tos)
130        if send_len is not None:
131            cmd += " len=" + str(send_len)
132        if dev2group:
133            dev2.group_request(cmd)
134        else:
135            dev2.request(cmd)
136        if dev1group:
137            ev = dev1.wait_group_event(["DATA-TEST-RX"], timeout=timeout)
138        else:
139            ev = dev1.wait_event(["DATA-TEST-RX"], timeout=timeout)
140        if ev is None:
141            raise Exception("dev2->dev1 unicast data delivery failed")
142        if "DATA-TEST-RX {} {}".format(addr1, addr2) not in ev:
143            raise Exception("Unexpected dev2->dev1 unicast data result")
144        if send_len is not None:
145            if " len=" + str(send_len) not in ev:
146                raise Exception("Unexpected dev2->dev1 unicast data length")
147        else:
148            if " len=" in ev:
149                raise Exception("Unexpected dev2->dev1 unicast data length")
150
151        if broadcast:
152            run_multicast_connectivity_test(dev2, dev1, tos,
153                                            dev2group, dev1group,
154                                            ifname2, ifname1, False, timeout,
155                                            send_len, multicast_to_unicast,
156                                            broadcast_retry_c)
157
158    finally:
159        if config:
160            if dev1group:
161                dev1.group_request("DATA_TEST_CONFIG 0")
162            else:
163                dev1.request("DATA_TEST_CONFIG 0")
164            if dev2group:
165                dev2.group_request("DATA_TEST_CONFIG 0")
166            else:
167                dev2.request("DATA_TEST_CONFIG 0")
168
169def test_connectivity(dev1, dev2, dscp=None, tos=None, max_tries=1,
170                      dev1group=False, dev2group=False,
171                      ifname1=None, ifname2=None, config=True, timeout=5,
172                      multicast_to_unicast=False, success_expected=True,
173                      broadcast=True, send_len=None):
174    if dscp:
175        tos = dscp << 2
176    if not tos:
177        tos = 0
178
179    success = False
180    last_err = None
181    for i in range(0, max_tries):
182        try:
183            run_connectivity_test(dev1, dev2, tos, dev1group, dev2group,
184                                  ifname1, ifname2, config=config,
185                                  timeout=timeout,
186                                  multicast_to_unicast=multicast_to_unicast,
187                                  broadcast=broadcast, send_len=send_len)
188            success = True
189            break
190        except Exception as e:
191            last_err = e
192            if i + 1 < max_tries:
193                time.sleep(1)
194    if success_expected and not success:
195        raise Exception(last_err)
196    if not success_expected and success:
197        raise Exception("Unexpected connectivity detected")
198
199def test_connectivity_iface(dev1, dev2, ifname, dscp=None, tos=None,
200                            max_tries=1, timeout=5):
201    test_connectivity(dev1, dev2, dscp, tos, ifname2=ifname,
202                      max_tries=max_tries, timeout=timeout)
203
204def test_connectivity_p2p(dev1, dev2, dscp=None, tos=None):
205    test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=True)
206
207def test_connectivity_p2p_sta(dev1, dev2, dscp=None, tos=None):
208    test_connectivity(dev1, dev2, dscp, tos, dev1group=True, dev2group=False)
209
210def test_connectivity_sta(dev1, dev2, dscp=None, tos=None):
211    test_connectivity(dev1, dev2, dscp, tos)
212
213(PS_DISABLED, PS_ENABLED, PS_AUTO_POLL, PS_MANUAL_POLL) = list(range(4))
214
215def set_powersave(dev, val):
216    phy = dev.get_driver_status_field("phyname")
217    fname = '/sys/kernel/debug/ieee80211/%s/hwsim/ps' % phy
218    data = '%d' % val
219    (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
220    if res != 0:
221        raise Exception("Failed to set power save for device")
222
223def set_group_map(dev, val):
224    phy = dev.get_driver_status_field("phyname")
225    fname = '/sys/kernel/debug/ieee80211/%s/hwsim/group' % phy
226    data = '%d' % val
227    (res, data) = dev.cmd_execute(["echo", data, ">", fname], shell=True)
228    if res != 0:
229        raise Exception("Failed to set group map for %s" % phy)
230
231def set_rx_rssi(dev, val):
232    """
233    Configure signal strength when receiving transmitted frames.
234    mac80211_hwsim driver sets rssi to: TX power - 50
235    According to that set tx_power in order to get the desired RSSI.
236    Valid RSSI range: -50 to -30.
237    """
238    tx_power = (val + 50) * 100
239    ifname = dev.get_driver_status_field("ifname")
240    (res, data) = dev.cmd_execute(['iw', ifname, 'set', 'txpower',
241                                   'fixed', str(tx_power)])
242    if res != 0:
243        raise Exception("Failed to set RSSI to %d" % val)
244
245def reset_rx_rssi(dev):
246    set_rx_rssi(dev, -30)
247