1#!/usr/bin/env python3 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7# TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd. 8 9 10"""Tests specific to all BSD platforms.""" 11 12 13import datetime 14import os 15import re 16import time 17 18import psutil 19from psutil import BSD 20from psutil import FREEBSD 21from psutil import NETBSD 22from psutil import OPENBSD 23from psutil.tests import get_test_subprocess 24from psutil.tests import HAS_BATTERY 25from psutil.tests import MEMORY_TOLERANCE 26from psutil.tests import reap_children 27from psutil.tests import retry_on_failure 28from psutil.tests import sh 29from psutil.tests import unittest 30from psutil.tests import which 31 32 33if BSD: 34 PAGESIZE = os.sysconf("SC_PAGE_SIZE") 35 if os.getuid() == 0: # muse requires root privileges 36 MUSE_AVAILABLE = which('muse') 37 else: 38 MUSE_AVAILABLE = False 39else: 40 MUSE_AVAILABLE = False 41 42 43def sysctl(cmdline): 44 """Expects a sysctl command with an argument and parse the result 45 returning only the value of interest. 46 """ 47 result = sh("sysctl " + cmdline) 48 if FREEBSD: 49 result = result[result.find(": ") + 2:] 50 elif OPENBSD or NETBSD: 51 result = result[result.find("=") + 1:] 52 try: 53 return int(result) 54 except ValueError: 55 return result 56 57 58def muse(field): 59 """Thin wrapper around 'muse' cmdline utility.""" 60 out = sh('muse') 61 for line in out.split('\n'): 62 if line.startswith(field): 63 break 64 else: 65 raise ValueError("line not found") 66 return int(line.split()[1]) 67 68 69# ===================================================================== 70# --- All BSD* 71# ===================================================================== 72 73 74@unittest.skipIf(not BSD, "BSD only") 75class BSDTestCase(unittest.TestCase): 76 """Generic tests common to all BSD variants.""" 77 78 @classmethod 79 def setUpClass(cls): 80 cls.pid = get_test_subprocess().pid 81 82 @classmethod 83 def tearDownClass(cls): 84 reap_children() 85 86 @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD") 87 def test_process_create_time(self): 88 output = sh("ps -o lstart -p %s" % self.pid) 89 start_ps = output.replace('STARTED', '').strip() 90 start_psutil = psutil.Process(self.pid).create_time() 91 start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", 92 time.localtime(start_psutil)) 93 self.assertEqual(start_ps, start_psutil) 94 95 def test_disks(self): 96 # test psutil.disk_usage() and psutil.disk_partitions() 97 # against "df -a" 98 def df(path): 99 out = sh('df -k "%s"' % path).strip() 100 lines = out.split('\n') 101 lines.pop(0) 102 line = lines.pop(0) 103 dev, total, used, free = line.split()[:4] 104 if dev == 'none': 105 dev = '' 106 total = int(total) * 1024 107 used = int(used) * 1024 108 free = int(free) * 1024 109 return dev, total, used, free 110 111 for part in psutil.disk_partitions(all=False): 112 usage = psutil.disk_usage(part.mountpoint) 113 dev, total, used, free = df(part.mountpoint) 114 self.assertEqual(part.device, dev) 115 self.assertEqual(usage.total, total) 116 # 10 MB tollerance 117 if abs(usage.free - free) > 10 * 1024 * 1024: 118 self.fail("psutil=%s, df=%s" % (usage.free, free)) 119 if abs(usage.used - used) > 10 * 1024 * 1024: 120 self.fail("psutil=%s, df=%s" % (usage.used, used)) 121 122 @unittest.skipIf(not which('sysctl'), "sysctl cmd not available") 123 def test_cpu_count_logical(self): 124 syst = sysctl("hw.ncpu") 125 self.assertEqual(psutil.cpu_count(logical=True), syst) 126 127 @unittest.skipIf(not which('sysctl'), "sysctl cmd not available") 128 def test_virtual_memory_total(self): 129 num = sysctl('hw.physmem') 130 self.assertEqual(num, psutil.virtual_memory().total) 131 132 def test_net_if_stats(self): 133 for name, stats in psutil.net_if_stats().items(): 134 try: 135 out = sh("ifconfig %s" % name) 136 except RuntimeError: 137 pass 138 else: 139 self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) 140 if "mtu" in out: 141 self.assertEqual(stats.mtu, 142 int(re.findall(r'mtu (\d+)', out)[0])) 143 144 145# ===================================================================== 146# --- FreeBSD 147# ===================================================================== 148 149 150@unittest.skipIf(not FREEBSD, "FREEBSD only") 151class FreeBSDProcessTestCase(unittest.TestCase): 152 153 @classmethod 154 def setUpClass(cls): 155 cls.pid = get_test_subprocess().pid 156 157 @classmethod 158 def tearDownClass(cls): 159 reap_children() 160 161 @retry_on_failure() 162 def test_memory_maps(self): 163 out = sh('procstat -v %s' % self.pid) 164 maps = psutil.Process(self.pid).memory_maps(grouped=False) 165 lines = out.split('\n')[1:] 166 while lines: 167 line = lines.pop() 168 fields = line.split() 169 _, start, stop, perms, res = fields[:5] 170 map = maps.pop() 171 self.assertEqual("%s-%s" % (start, stop), map.addr) 172 self.assertEqual(int(res), map.rss) 173 if not map.path.startswith('['): 174 self.assertEqual(fields[10], map.path) 175 176 def test_exe(self): 177 out = sh('procstat -b %s' % self.pid) 178 self.assertEqual(psutil.Process(self.pid).exe(), 179 out.split('\n')[1].split()[-1]) 180 181 def test_cmdline(self): 182 out = sh('procstat -c %s' % self.pid) 183 self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), 184 ' '.join(out.split('\n')[1].split()[2:])) 185 186 def test_uids_gids(self): 187 out = sh('procstat -s %s' % self.pid) 188 euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] 189 p = psutil.Process(self.pid) 190 uids = p.uids() 191 gids = p.gids() 192 self.assertEqual(uids.real, int(ruid)) 193 self.assertEqual(uids.effective, int(euid)) 194 self.assertEqual(uids.saved, int(suid)) 195 self.assertEqual(gids.real, int(rgid)) 196 self.assertEqual(gids.effective, int(egid)) 197 self.assertEqual(gids.saved, int(sgid)) 198 199 @retry_on_failure() 200 def test_ctx_switches(self): 201 tested = [] 202 out = sh('procstat -r %s' % self.pid) 203 p = psutil.Process(self.pid) 204 for line in out.split('\n'): 205 line = line.lower().strip() 206 if ' voluntary context' in line: 207 pstat_value = int(line.split()[-1]) 208 psutil_value = p.num_ctx_switches().voluntary 209 self.assertEqual(pstat_value, psutil_value) 210 tested.append(None) 211 elif ' involuntary context' in line: 212 pstat_value = int(line.split()[-1]) 213 psutil_value = p.num_ctx_switches().involuntary 214 self.assertEqual(pstat_value, psutil_value) 215 tested.append(None) 216 if len(tested) != 2: 217 raise RuntimeError("couldn't find lines match in procstat out") 218 219 @retry_on_failure() 220 def test_cpu_times(self): 221 tested = [] 222 out = sh('procstat -r %s' % self.pid) 223 p = psutil.Process(self.pid) 224 for line in out.split('\n'): 225 line = line.lower().strip() 226 if 'user time' in line: 227 pstat_value = float('0.' + line.split()[-1].split('.')[-1]) 228 psutil_value = p.cpu_times().user 229 self.assertEqual(pstat_value, psutil_value) 230 tested.append(None) 231 elif 'system time' in line: 232 pstat_value = float('0.' + line.split()[-1].split('.')[-1]) 233 psutil_value = p.cpu_times().system 234 self.assertEqual(pstat_value, psutil_value) 235 tested.append(None) 236 if len(tested) != 2: 237 raise RuntimeError("couldn't find lines match in procstat out") 238 239 240@unittest.skipIf(not FREEBSD, "FREEBSD only") 241class FreeBSDSystemTestCase(unittest.TestCase): 242 243 @staticmethod 244 def parse_swapinfo(): 245 # the last line is always the total 246 output = sh("swapinfo -k").splitlines()[-1] 247 parts = re.split(r'\s+', output) 248 249 if not parts: 250 raise ValueError("Can't parse swapinfo: %s" % output) 251 252 # the size is in 1k units, so multiply by 1024 253 total, used, free = (int(p) * 1024 for p in parts[1:4]) 254 return total, used, free 255 256 def test_cpu_frequency_against_sysctl(self): 257 # Currently only cpu 0 is frequency is supported in FreeBSD 258 # All other cores use the same frequency. 259 sensor = "dev.cpu.0.freq" 260 try: 261 sysctl_result = int(sysctl(sensor)) 262 except RuntimeError: 263 self.skipTest("frequencies not supported by kernel") 264 self.assertEqual(psutil.cpu_freq().current, sysctl_result) 265 266 sensor = "dev.cpu.0.freq_levels" 267 sysctl_result = sysctl(sensor) 268 # sysctl returns a string of the format: 269 # <freq_level_1>/<voltage_level_1> <freq_level_2>/<voltage_level_2>... 270 # Ordered highest available to lowest available. 271 max_freq = int(sysctl_result.split()[0].split("/")[0]) 272 min_freq = int(sysctl_result.split()[-1].split("/")[0]) 273 self.assertEqual(psutil.cpu_freq().max, max_freq) 274 self.assertEqual(psutil.cpu_freq().min, min_freq) 275 276 # --- virtual_memory(); tests against sysctl 277 278 @retry_on_failure() 279 def test_vmem_active(self): 280 syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE 281 self.assertAlmostEqual(psutil.virtual_memory().active, syst, 282 delta=MEMORY_TOLERANCE) 283 284 @retry_on_failure() 285 def test_vmem_inactive(self): 286 syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE 287 self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, 288 delta=MEMORY_TOLERANCE) 289 290 @retry_on_failure() 291 def test_vmem_wired(self): 292 syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE 293 self.assertAlmostEqual(psutil.virtual_memory().wired, syst, 294 delta=MEMORY_TOLERANCE) 295 296 @retry_on_failure() 297 def test_vmem_cached(self): 298 syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE 299 self.assertAlmostEqual(psutil.virtual_memory().cached, syst, 300 delta=MEMORY_TOLERANCE) 301 302 @retry_on_failure() 303 def test_vmem_free(self): 304 syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE 305 self.assertAlmostEqual(psutil.virtual_memory().free, syst, 306 delta=MEMORY_TOLERANCE) 307 308 @retry_on_failure() 309 def test_vmem_buffers(self): 310 syst = sysctl("vfs.bufspace") 311 self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, 312 delta=MEMORY_TOLERANCE) 313 314 # --- virtual_memory(); tests against muse 315 316 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 317 def test_muse_vmem_total(self): 318 num = muse('Total') 319 self.assertEqual(psutil.virtual_memory().total, num) 320 321 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 322 @retry_on_failure() 323 def test_muse_vmem_active(self): 324 num = muse('Active') 325 self.assertAlmostEqual(psutil.virtual_memory().active, num, 326 delta=MEMORY_TOLERANCE) 327 328 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 329 @retry_on_failure() 330 def test_muse_vmem_inactive(self): 331 num = muse('Inactive') 332 self.assertAlmostEqual(psutil.virtual_memory().inactive, num, 333 delta=MEMORY_TOLERANCE) 334 335 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 336 @retry_on_failure() 337 def test_muse_vmem_wired(self): 338 num = muse('Wired') 339 self.assertAlmostEqual(psutil.virtual_memory().wired, num, 340 delta=MEMORY_TOLERANCE) 341 342 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 343 @retry_on_failure() 344 def test_muse_vmem_cached(self): 345 num = muse('Cache') 346 self.assertAlmostEqual(psutil.virtual_memory().cached, num, 347 delta=MEMORY_TOLERANCE) 348 349 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 350 @retry_on_failure() 351 def test_muse_vmem_free(self): 352 num = muse('Free') 353 self.assertAlmostEqual(psutil.virtual_memory().free, num, 354 delta=MEMORY_TOLERANCE) 355 356 @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed") 357 @retry_on_failure() 358 def test_muse_vmem_buffers(self): 359 num = muse('Buffer') 360 self.assertAlmostEqual(psutil.virtual_memory().buffers, num, 361 delta=MEMORY_TOLERANCE) 362 363 def test_cpu_stats_ctx_switches(self): 364 self.assertAlmostEqual(psutil.cpu_stats().ctx_switches, 365 sysctl('vm.stats.sys.v_swtch'), delta=1000) 366 367 def test_cpu_stats_interrupts(self): 368 self.assertAlmostEqual(psutil.cpu_stats().interrupts, 369 sysctl('vm.stats.sys.v_intr'), delta=1000) 370 371 def test_cpu_stats_soft_interrupts(self): 372 self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts, 373 sysctl('vm.stats.sys.v_soft'), delta=1000) 374 375 @retry_on_failure() 376 def test_cpu_stats_syscalls(self): 377 # pretty high tolerance but it looks like it's OK. 378 self.assertAlmostEqual(psutil.cpu_stats().syscalls, 379 sysctl('vm.stats.sys.v_syscall'), delta=200000) 380 381 # def test_cpu_stats_traps(self): 382 # self.assertAlmostEqual(psutil.cpu_stats().traps, 383 # sysctl('vm.stats.sys.v_trap'), delta=1000) 384 385 # --- swap memory 386 387 def test_swapmem_free(self): 388 total, used, free = self.parse_swapinfo() 389 self.assertAlmostEqual( 390 psutil.swap_memory().free, free, delta=MEMORY_TOLERANCE) 391 392 def test_swapmem_used(self): 393 total, used, free = self.parse_swapinfo() 394 self.assertAlmostEqual( 395 psutil.swap_memory().used, used, delta=MEMORY_TOLERANCE) 396 397 def test_swapmem_total(self): 398 total, used, free = self.parse_swapinfo() 399 self.assertAlmostEqual( 400 psutil.swap_memory().total, total, delta=MEMORY_TOLERANCE) 401 402 # --- others 403 404 def test_boot_time(self): 405 s = sysctl('sysctl kern.boottime') 406 s = s[s.find(" sec = ") + 7:] 407 s = s[:s.find(',')] 408 btime = int(s) 409 self.assertEqual(btime, psutil.boot_time()) 410 411 # --- sensors_battery 412 413 @unittest.skipIf(not HAS_BATTERY, "no battery") 414 def test_sensors_battery(self): 415 def secs2hours(secs): 416 m, s = divmod(secs, 60) 417 h, m = divmod(m, 60) 418 return "%d:%02d" % (h, m) 419 420 out = sh("acpiconf -i 0") 421 fields = dict([(x.split('\t')[0], x.split('\t')[-1]) 422 for x in out.split("\n")]) 423 metrics = psutil.sensors_battery() 424 percent = int(fields['Remaining capacity:'].replace('%', '')) 425 remaining_time = fields['Remaining time:'] 426 self.assertEqual(metrics.percent, percent) 427 if remaining_time == 'unknown': 428 self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED) 429 else: 430 self.assertEqual(secs2hours(metrics.secsleft), remaining_time) 431 432 @unittest.skipIf(not HAS_BATTERY, "no battery") 433 def test_sensors_battery_against_sysctl(self): 434 self.assertEqual(psutil.sensors_battery().percent, 435 sysctl("hw.acpi.battery.life")) 436 self.assertEqual(psutil.sensors_battery().power_plugged, 437 sysctl("hw.acpi.acline") == 1) 438 secsleft = psutil.sensors_battery().secsleft 439 if secsleft < 0: 440 self.assertEqual(sysctl("hw.acpi.battery.time"), -1) 441 else: 442 self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60) 443 444 @unittest.skipIf(HAS_BATTERY, "has battery") 445 def test_sensors_battery_no_battery(self): 446 # If no battery is present one of these calls is supposed 447 # to fail, see: 448 # https://github.com/giampaolo/psutil/issues/1074 449 with self.assertRaises(RuntimeError): 450 sysctl("hw.acpi.battery.life") 451 sysctl("hw.acpi.battery.time") 452 sysctl("hw.acpi.acline") 453 self.assertIsNone(psutil.sensors_battery()) 454 455 # --- sensors_temperatures 456 457 def test_sensors_temperatures_against_sysctl(self): 458 num_cpus = psutil.cpu_count(True) 459 for cpu in range(num_cpus): 460 sensor = "dev.cpu.%s.temperature" % cpu 461 # sysctl returns a string in the format 46.0C 462 try: 463 sysctl_result = int(float(sysctl(sensor)[:-1])) 464 except RuntimeError: 465 self.skipTest("temperatures not supported by kernel") 466 self.assertAlmostEqual( 467 psutil.sensors_temperatures()["coretemp"][cpu].current, 468 sysctl_result, delta=10) 469 470 sensor = "dev.cpu.%s.coretemp.tjmax" % cpu 471 sysctl_result = int(float(sysctl(sensor)[:-1])) 472 self.assertEqual( 473 psutil.sensors_temperatures()["coretemp"][cpu].high, 474 sysctl_result) 475 476# ===================================================================== 477# --- OpenBSD 478# ===================================================================== 479 480 481@unittest.skipIf(not OPENBSD, "OPENBSD only") 482class OpenBSDTestCase(unittest.TestCase): 483 484 def test_boot_time(self): 485 s = sysctl('kern.boottime') 486 sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y") 487 psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time()) 488 self.assertEqual(sys_bt, psutil_bt) 489 490 491# ===================================================================== 492# --- NetBSD 493# ===================================================================== 494 495 496@unittest.skipIf(not NETBSD, "NETBSD only") 497class NetBSDTestCase(unittest.TestCase): 498 499 @staticmethod 500 def parse_meminfo(look_for): 501 with open('/proc/meminfo', 'rt') as f: 502 for line in f: 503 if line.startswith(look_for): 504 return int(line.split()[1]) * 1024 505 raise ValueError("can't find %s" % look_for) 506 507 def test_vmem_total(self): 508 self.assertEqual( 509 psutil.virtual_memory().total, self.parse_meminfo("MemTotal:")) 510 511 def test_vmem_free(self): 512 self.assertAlmostEqual( 513 psutil.virtual_memory().free, self.parse_meminfo("MemFree:"), 514 delta=MEMORY_TOLERANCE) 515 516 def test_vmem_buffers(self): 517 self.assertAlmostEqual( 518 psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"), 519 delta=MEMORY_TOLERANCE) 520 521 def test_vmem_shared(self): 522 self.assertAlmostEqual( 523 psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"), 524 delta=MEMORY_TOLERANCE) 525 526 def test_swapmem_total(self): 527 self.assertAlmostEqual( 528 psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"), 529 delta=MEMORY_TOLERANCE) 530 531 def test_swapmem_free(self): 532 self.assertAlmostEqual( 533 psutil.swap_memory().free, self.parse_meminfo("SwapFree:"), 534 delta=MEMORY_TOLERANCE) 535 536 def test_swapmem_used(self): 537 smem = psutil.swap_memory() 538 self.assertEqual(smem.used, smem.total - smem.free) 539 540 def test_cpu_stats_interrupts(self): 541 with open('/proc/stat', 'rb') as f: 542 for line in f: 543 if line.startswith(b'intr'): 544 interrupts = int(line.split()[1]) 545 break 546 else: 547 raise ValueError("couldn't find line") 548 self.assertAlmostEqual( 549 psutil.cpu_stats().interrupts, interrupts, delta=1000) 550 551 def test_cpu_stats_ctx_switches(self): 552 with open('/proc/stat', 'rb') as f: 553 for line in f: 554 if line.startswith(b'ctxt'): 555 ctx_switches = int(line.split()[1]) 556 break 557 else: 558 raise ValueError("couldn't find line") 559 self.assertAlmostEqual( 560 psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000) 561 562 563if __name__ == '__main__': 564 from psutil.tests.runner import run 565 run(__file__) 566