1#!/usr/bin/env python3 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7# TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd. 8 9 10"""Tests specific to all BSD platforms.""" 11 12 13import datetime 14import os 15import re 16import time 17 18import psutil 19from psutil import BSD 20from psutil import FREEBSD 21from psutil import NETBSD 22from psutil import OPENBSD 23from psutil.tests import spawn_testproc 24from psutil.tests import HAS_BATTERY 25from psutil.tests import PsutilTestCase 26from psutil.tests import retry_on_failure 27from psutil.tests import sh 28from psutil.tests import TOLERANCE_SYS_MEM 29from psutil.tests import terminate 30from psutil.tests import unittest 31from psutil.tests import which 32 33 34if BSD: 35 from psutil._psutil_posix import getpagesize 36 37 PAGESIZE = getpagesize() 38 # muse requires root privileges 39 MUSE_AVAILABLE = True if os.getuid() == 0 and which('muse') else False 40else: 41 PAGESIZE = None 42 MUSE_AVAILABLE = False 43 44 45def sysctl(cmdline): 46 """Expects a sysctl command with an argument and parse the result 47 returning only the value of interest. 48 """ 49 result = sh("sysctl " + cmdline) 50 if FREEBSD: 51 result = result[result.find(": ") + 2:] 52 elif OPENBSD or NETBSD: 53 result = result[result.find("=") + 1:] 54 try: 55 return int(result) 56 except ValueError: 57 return result 58 59 60def muse(field): 61 """Thin wrapper around 'muse' cmdline utility.""" 62 out = sh('muse') 63 for line in out.split('\n'): 64 if line.startswith(field): 65 break 66 else: 67 raise ValueError("line not found") 68 return int(line.split()[1]) 69 70 71# ===================================================================== 72# --- All BSD* 73# ===================================================================== 74 75 76@unittest.skipIf(not BSD, "BSD only") 77class BSDTestCase(PsutilTestCase): 78 """Generic tests common to all BSD variants.""" 79 80 @classmethod 81 def setUpClass(cls): 82 cls.pid = spawn_testproc().pid 83 84 @classmethod 85 def tearDownClass(cls): 86 terminate(cls.pid) 87 88 @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD") 89 def test_process_create_time(self): 90 output = sh("ps -o lstart -p %s" % self.pid) 91 start_ps = output.replace('STARTED', '').strip() 92 start_psutil = psutil.Process(self.pid).create_time() 93 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", 94 time.localtime(start_psutil)) 95 self.assertEqual(start_ps, start_psutil) 96 97 def test_disks(self): 98 # test psutil.disk_usage() and psutil.disk_partitions() 99 # against "df -a" 100 def df(path): 101 out = sh('df -k "%s"' % path).strip() 102 lines = out.split('\n') 103 lines.pop(0) 104 line = lines.pop(0) 105 dev, total, used, free = line.split()[:4] 106 if dev == 'none': 107 dev = '' 108 total = int(total) * 1024 109 used = int(used) * 1024 110 free = int(free) * 1024 111 return dev, total, used, free 112 113 for part in psutil.disk_partitions(all=False): 114 usage = psutil.disk_usage(part.mountpoint) 115 dev, total, used, free = df(part.mountpoint) 116 self.assertEqual(part.device, dev) 117 self.assertEqual(usage.total, total) 118 # 10 MB tollerance 119 if abs(usage.free - free) > 10 * 1024 * 1024: 120 self.fail("psutil=%s, df=%s" % (usage.free, free)) 121 if abs(usage.used - used) > 10 * 1024 * 1024: 122 self.fail("psutil=%s, df=%s" % (usage.used, used)) 123 124 @unittest.skipIf(not which('sysctl'), "sysctl cmd not available") 125 def test_cpu_count_logical(self): 126 syst = sysctl("hw.ncpu") 127 self.assertEqual(psutil.cpu_count(logical=True), syst) 128 129 @unittest.skipIf(not which('sysctl'), "sysctl cmd not available") 130 def test_virtual_memory_total(self): 131 num = sysctl('hw.physmem') 132 self.assertEqual(num, psutil.virtual_memory().total) 133 134 def test_net_if_stats(self): 135 for name, stats in psutil.net_if_stats().items(): 136 try: 137 out = sh("ifconfig %s" % name) 138 except RuntimeError: 139 pass 140 else: 141 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) 142 if "mtu" in out: 143 self.assertEqual(stats.mtu, 144 int(re.findall(r'mtu (\d+)', out)[0])) 145 146 147# ===================================================================== 148# --- FreeBSD 149# ===================================================================== 150 151 152@unittest.skipIf(not FREEBSD, "FREEBSD only") 153class FreeBSDPsutilTestCase(PsutilTestCase): 154 155 @classmethod 156 def setUpClass(cls): 157 cls.pid = spawn_testproc().pid 158 159 @classmethod 160 def tearDownClass(cls): 161 terminate(cls.pid) 162 163 @retry_on_failure() 164 def test_memory_maps(self): 165 out = sh('procstat -v %s' % self.pid) 166 maps = psutil.Process(self.pid).memory_maps(grouped=False) 167 lines = out.split('\n')[1:] 168 while lines: 169 line = lines.pop() 170 fields = line.split() 171 _, start, stop, perms, res = fields[:5] 172 map = maps.pop() 173 self.assertEqual("%s-%s" % (start, stop), map.addr) 174 self.assertEqual(int(res), map.rss) 175 if not map.path.startswith('['): 176 self.assertEqual(fields[10], map.path) 177 178 def test_exe(self): 179 out = sh('procstat -b %s' % self.pid) 180 self.assertEqual(psutil.Process(self.pid).exe(), 181 out.split('\n')[1].split()[-1]) 182 183 def test_cmdline(self): 184 out = sh('procstat -c %s' % self.pid) 185 self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), 186 ' '.join(out.split('\n')[1].split()[2:])) 187 188 def test_uids_gids(self): 189 out = sh('procstat -s %s' % self.pid) 190 euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] 191 p = psutil.Process(self.pid) 192 uids = p.uids() 193 gids = p.gids() 194 self.assertEqual(uids.real, int(ruid)) 195 self.assertEqual(uids.effective, int(euid)) 196 self.assertEqual(uids.saved, int(suid)) 197 self.assertEqual(gids.real, int(rgid)) 198 self.assertEqual(gids.effective, int(egid)) 199 self.assertEqual(gids.saved, int(sgid)) 200 201 @retry_on_failure() 202 def test_ctx_switches(self): 203 tested = [] 204 out = sh('procstat -r %s' % self.pid) 205 p = psutil.Process(self.pid) 206 for line in out.split('\n'): 207 line = line.lower().strip() 208 if ' voluntary context' in line: 209 pstat_value = int(line.split()[-1]) 210 psutil_value = p.num_ctx_switches().voluntary 211 self.assertEqual(pstat_value, psutil_value) 212 tested.append(None) 213 elif ' involuntary context' in line: 214 pstat_value = int(line.split()[-1]) 215 psutil_value = p.num_ctx_switches().involuntary 216 self.assertEqual(pstat_value, psutil_value) 217 tested.append(None) 218 if len(tested) != 2: 219 raise RuntimeError("couldn't find lines match in procstat out") 220 221 @retry_on_failure() 222 def test_cpu_times(self): 223 tested = [] 224 out = sh('procstat -r %s' % self.pid) 225 p = psutil.Process(self.pid) 226 for line in out.split('\n'): 227 line = line.lower().strip() 228 if 'user time' in line: 229 pstat_value = float('0.' + line.split()[-1].split('.')[-1]) 230 psutil_value = p.cpu_times().user 231 self.assertEqual(pstat_value, psutil_value) 232 tested.append(None) 233 elif 'system time' in line: 234 pstat_value = float('0.' + line.split()[-1].split('.')[-1]) 235 psutil_value = p.cpu_times().system 236 self.assertEqual(pstat_value, psutil_value) 237 tested.append(None) 238 if len(tested) != 2: 239 raise RuntimeError("couldn't find lines match in procstat out") 240 241 242@unittest.skipIf(not FREEBSD, "FREEBSD only") 243class FreeBSDSystemTestCase(PsutilTestCase): 244 245 @staticmethod 246 def parse_swapinfo(): 247 # the last line is always the total 248 output = sh("swapinfo -k").splitlines()[-1] 249 parts = re.split(r'\s+', output) 250 251 if not parts: 252 raise ValueError("Can't parse swapinfo: %s" % output) 253 254 # the size is in 1k units, so multiply by 1024 255 total, used, free = (int(p) * 1024 for p in parts[1:4]) 256 return total, used, free 257 258 def test_cpu_frequency_against_sysctl(self): 259 # Currently only cpu 0 is frequency is supported in FreeBSD 260 # All other cores use the same frequency. 261 sensor = "dev.cpu.0.freq" 262 try: 263 sysctl_result = int(sysctl(sensor)) 264 except RuntimeError: 265 self.skipTest("frequencies not supported by kernel") 266 self.assertEqual(psutil.cpu_freq().current, sysctl_result) 267 268 sensor = "dev.cpu.0.freq_levels" 269 sysctl_result = sysctl(sensor) 270 # sysctl returns a string of the format: 271 # <freq_level_1>/<voltage_level_1> <freq_level_2>/<voltage_level_2>... 272 # Ordered highest available to lowest available. 273 max_freq = int(sysctl_result.split()[0].split("/")[0]) 274 min_freq = int(sysctl_result.split()[-1].split("/")[0]) 275 self.assertEqual(psutil.cpu_freq().max, max_freq) 276 self.assertEqual(psutil.cpu_freq().min, min_freq) 277 278 # --- virtual_memory(); tests against sysctl 279 280 @retry_on_failure() 281 def test_vmem_active(self): 282 syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE 283 self.assertAlmostEqual(psutil.virtual_memory().active, syst, 284 delta=TOLERANCE_SYS_MEM) 285 286 @retry_on_failure() 287 def test_vmem_inactive(self): 288 syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE 289 self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, 290 delta=TOLERANCE_SYS_MEM) 291 292 @retry_on_failure() 293 def test_vmem_wired(self): 294 syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE 295 self.assertAlmostEqual(psutil.virtual_memory().wired, syst, 296 delta=TOLERANCE_SYS_MEM) 297 298 @retry_on_failure() 299 def test_vmem_cached(self): 300 syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE 301 self.assertAlmostEqual(psutil.virtual_memory().cached, syst, 302 delta=TOLERANCE_SYS_MEM) 303 304 @retry_on_failure() 305 def test_vmem_free(self): 306 syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE 307 self.assertAlmostEqual(psutil.virtual_memory().free, syst, 308 delta=TOLERANCE_SYS_MEM) 309 310 @retry_on_failure() 311 def test_vmem_buffers(self): 312 syst = sysctl("vfs.bufspace") 313 self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, 314 delta=TOLERANCE_SYS_MEM) 315 316 # --- virtual_memory(); tests against muse 317 318 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 319 def test_muse_vmem_total(self): 320 num = muse('Total') 321 self.assertEqual(psutil.virtual_memory().total, num) 322 323 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 324 @retry_on_failure() 325 def test_muse_vmem_active(self): 326 num = muse('Active') 327 self.assertAlmostEqual(psutil.virtual_memory().active, num, 328 delta=TOLERANCE_SYS_MEM) 329 330 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 331 @retry_on_failure() 332 def test_muse_vmem_inactive(self): 333 num = muse('Inactive') 334 self.assertAlmostEqual(psutil.virtual_memory().inactive, num, 335 delta=TOLERANCE_SYS_MEM) 336 337 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 338 @retry_on_failure() 339 def test_muse_vmem_wired(self): 340 num = muse('Wired') 341 self.assertAlmostEqual(psutil.virtual_memory().wired, num, 342 delta=TOLERANCE_SYS_MEM) 343 344 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 345 @retry_on_failure() 346 def test_muse_vmem_cached(self): 347 num = muse('Cache') 348 self.assertAlmostEqual(psutil.virtual_memory().cached, num, 349 delta=TOLERANCE_SYS_MEM) 350 351 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 352 @retry_on_failure() 353 def test_muse_vmem_free(self): 354 num = muse('Free') 355 self.assertAlmostEqual(psutil.virtual_memory().free, num, 356 delta=TOLERANCE_SYS_MEM) 357 358 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 359 @retry_on_failure() 360 def test_muse_vmem_buffers(self): 361 num = muse('Buffer') 362 self.assertAlmostEqual(psutil.virtual_memory().buffers, num, 363 delta=TOLERANCE_SYS_MEM) 364 365 def test_cpu_stats_ctx_switches(self): 366 self.assertAlmostEqual(psutil.cpu_stats().ctx_switches, 367 sysctl('vm.stats.sys.v_swtch'), delta=1000) 368 369 def test_cpu_stats_interrupts(self): 370 self.assertAlmostEqual(psutil.cpu_stats().interrupts, 371 sysctl('vm.stats.sys.v_intr'), delta=1000) 372 373 def test_cpu_stats_soft_interrupts(self): 374 self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts, 375 sysctl('vm.stats.sys.v_soft'), delta=1000) 376 377 @retry_on_failure() 378 def test_cpu_stats_syscalls(self): 379 # pretty high tolerance but it looks like it's OK. 380 self.assertAlmostEqual(psutil.cpu_stats().syscalls, 381 sysctl('vm.stats.sys.v_syscall'), delta=200000) 382 383 # def test_cpu_stats_traps(self): 384 # self.assertAlmostEqual(psutil.cpu_stats().traps, 385 # sysctl('vm.stats.sys.v_trap'), delta=1000) 386 387 # --- swap memory 388 389 def test_swapmem_free(self): 390 total, used, free = self.parse_swapinfo() 391 self.assertAlmostEqual( 392 psutil.swap_memory().free, free, delta=TOLERANCE_SYS_MEM) 393 394 def test_swapmem_used(self): 395 total, used, free = self.parse_swapinfo() 396 self.assertAlmostEqual( 397 psutil.swap_memory().used, used, delta=TOLERANCE_SYS_MEM) 398 399 def test_swapmem_total(self): 400 total, used, free = self.parse_swapinfo() 401 self.assertAlmostEqual( 402 psutil.swap_memory().total, total, delta=TOLERANCE_SYS_MEM) 403 404 # --- others 405 406 def test_boot_time(self): 407 s = sysctl('sysctl kern.boottime') 408 s = s[s.find(" sec = ") + 7:] 409 s = s[:s.find(',')] 410 btime = int(s) 411 self.assertEqual(btime, psutil.boot_time()) 412 413 # --- sensors_battery 414 415 @unittest.skipIf(not HAS_BATTERY, "no battery") 416 def test_sensors_battery(self): 417 def secs2hours(secs): 418 m, s = divmod(secs, 60) 419 h, m = divmod(m, 60) 420 return "%d:%02d" % (h, m) 421 422 out = sh("acpiconf -i 0") 423 fields = dict([(x.split('\t')[0], x.split('\t')[-1]) 424 for x in out.split("\n")]) 425 metrics = psutil.sensors_battery() 426 percent = int(fields['Remaining capacity:'].replace('%', '')) 427 remaining_time = fields['Remaining time:'] 428 self.assertEqual(metrics.percent, percent) 429 if remaining_time == 'unknown': 430 self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED) 431 else: 432 self.assertEqual(secs2hours(metrics.secsleft), remaining_time) 433 434 @unittest.skipIf(not HAS_BATTERY, "no battery") 435 def test_sensors_battery_against_sysctl(self): 436 self.assertEqual(psutil.sensors_battery().percent, 437 sysctl("hw.acpi.battery.life")) 438 self.assertEqual(psutil.sensors_battery().power_plugged, 439 sysctl("hw.acpi.acline") == 1) 440 secsleft = psutil.sensors_battery().secsleft 441 if secsleft < 0: 442 self.assertEqual(sysctl("hw.acpi.battery.time"), -1) 443 else: 444 self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60) 445 446 @unittest.skipIf(HAS_BATTERY, "has battery") 447 def test_sensors_battery_no_battery(self): 448 # If no battery is present one of these calls is supposed 449 # to fail, see: 450 # https://github.com/giampaolo/psutil/issues/1074 451 with self.assertRaises(RuntimeError): 452 sysctl("hw.acpi.battery.life") 453 sysctl("hw.acpi.battery.time") 454 sysctl("hw.acpi.acline") 455 self.assertIsNone(psutil.sensors_battery()) 456 457 # --- sensors_temperatures 458 459 def test_sensors_temperatures_against_sysctl(self): 460 num_cpus = psutil.cpu_count(True) 461 for cpu in range(num_cpus): 462 sensor = "dev.cpu.%s.temperature" % cpu 463 # sysctl returns a string in the format 46.0C 464 try: 465 sysctl_result = int(float(sysctl(sensor)[:-1])) 466 except RuntimeError: 467 self.skipTest("temperatures not supported by kernel") 468 self.assertAlmostEqual( 469 psutil.sensors_temperatures()["coretemp"][cpu].current, 470 sysctl_result, delta=10) 471 472 sensor = "dev.cpu.%s.coretemp.tjmax" % cpu 473 sysctl_result = int(float(sysctl(sensor)[:-1])) 474 self.assertEqual( 475 psutil.sensors_temperatures()["coretemp"][cpu].high, 476 sysctl_result) 477 478 479# ===================================================================== 480# --- OpenBSD 481# ===================================================================== 482 483 484@unittest.skipIf(not OPENBSD, "OPENBSD only") 485class OpenBSDTestCase(PsutilTestCase): 486 487 def test_boot_time(self): 488 s = sysctl('kern.boottime') 489 sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y") 490 psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time()) 491 self.assertEqual(sys_bt, psutil_bt) 492 493 494# ===================================================================== 495# --- NetBSD 496# ===================================================================== 497 498 499@unittest.skipIf(not NETBSD, "NETBSD only") 500class NetBSDTestCase(PsutilTestCase): 501 502 @staticmethod 503 def parse_meminfo(look_for): 504 with open('/proc/meminfo', 'rt') as f: 505 for line in f: 506 if line.startswith(look_for): 507 return int(line.split()[1]) * 1024 508 raise ValueError("can't find %s" % look_for) 509 510 def test_vmem_total(self): 511 self.assertEqual( 512 psutil.virtual_memory().total, self.parse_meminfo("MemTotal:")) 513 514 def test_vmem_free(self): 515 self.assertAlmostEqual( 516 psutil.virtual_memory().free, self.parse_meminfo("MemFree:"), 517 delta=TOLERANCE_SYS_MEM) 518 519 def test_vmem_buffers(self): 520 self.assertAlmostEqual( 521 psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"), 522 delta=TOLERANCE_SYS_MEM) 523 524 def test_vmem_shared(self): 525 self.assertAlmostEqual( 526 psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"), 527 delta=TOLERANCE_SYS_MEM) 528 529 def test_swapmem_total(self): 530 self.assertAlmostEqual( 531 psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"), 532 delta=TOLERANCE_SYS_MEM) 533 534 def test_swapmem_free(self): 535 self.assertAlmostEqual( 536 psutil.swap_memory().free, self.parse_meminfo("SwapFree:"), 537 delta=TOLERANCE_SYS_MEM) 538 539 def test_swapmem_used(self): 540 smem = psutil.swap_memory() 541 self.assertEqual(smem.used, smem.total - smem.free) 542 543 def test_cpu_stats_interrupts(self): 544 with open('/proc/stat', 'rb') as f: 545 for line in f: 546 if line.startswith(b'intr'): 547 interrupts = int(line.split()[1]) 548 break 549 else: 550 raise ValueError("couldn't find line") 551 self.assertAlmostEqual( 552 psutil.cpu_stats().interrupts, interrupts, delta=1000) 553 554 def test_cpu_stats_ctx_switches(self): 555 with open('/proc/stat', 'rb') as f: 556 for line in f: 557 if line.startswith(b'ctxt'): 558 ctx_switches = int(line.split()[1]) 559 break 560 else: 561 raise ValueError("couldn't find line") 562 self.assertAlmostEqual( 563 psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000) 564 565 566if __name__ == '__main__': 567 from psutil.tests.runner import run_from_name 568 run_from_name(__file__) 569