1#!/usr/bin/env python3 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Linux specific tests.""" 8 9from __future__ import division 10import collections 11import contextlib 12import errno 13import glob 14import io 15import os 16import re 17import shutil 18import socket 19import struct 20import tempfile 21import textwrap 22import time 23import warnings 24 25import psutil 26from psutil import LINUX 27from psutil._compat import basestring 28from psutil._compat import FileNotFoundError 29from psutil._compat import PY3 30from psutil._compat import u 31from psutil.tests import call_until 32from psutil.tests import HAS_BATTERY 33from psutil.tests import HAS_CPU_FREQ 34from psutil.tests import HAS_GETLOADAVG 35from psutil.tests import HAS_RLIMIT 36from psutil.tests import MEMORY_TOLERANCE 37from psutil.tests import mock 38from psutil.tests import PYPY 39from psutil.tests import pyrun 40from psutil.tests import reap_children 41from psutil.tests import reload_module 42from psutil.tests import retry_on_failure 43from psutil.tests import safe_rmpath 44from psutil.tests import sh 45from psutil.tests import skip_on_not_implemented 46from psutil.tests import TESTFN 47from psutil.tests import ThreadTask 48from psutil.tests import TRAVIS 49from psutil.tests import unittest 50from psutil.tests import which 51 52 53HERE = os.path.abspath(os.path.dirname(__file__)) 54SIOCGIFADDR = 0x8915 55SIOCGIFCONF = 0x8912 56SIOCGIFHWADDR = 0x8927 57if LINUX: 58 SECTOR_SIZE = 512 59EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*') 60 61# ===================================================================== 62# --- utils 63# ===================================================================== 64 65 66def get_ipv4_address(ifname): 67 import fcntl 68 ifname = ifname[:15] 69 if PY3: 70 ifname = bytes(ifname, 'ascii') 71 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 72 with contextlib.closing(s): 73 return socket.inet_ntoa( 74 fcntl.ioctl(s.fileno(), 75 SIOCGIFADDR, 76 struct.pack('256s', ifname))[20:24]) 77 78 79def get_mac_address(ifname): 80 import fcntl 81 ifname = ifname[:15] 82 if PY3: 83 ifname = bytes(ifname, 'ascii') 84 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 85 with contextlib.closing(s): 86 info = fcntl.ioctl( 87 s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) 88 if PY3: 89 def ord(x): 90 return x 91 else: 92 import __builtin__ 93 ord = __builtin__.ord 94 return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] 95 96 97def free_swap(): 98 """Parse 'free' cmd and return swap memory's s total, used and free 99 values. 100 """ 101 out = sh('free -b', env={"LANG": "C.UTF-8"}) 102 lines = out.split('\n') 103 for line in lines: 104 if line.startswith('Swap'): 105 _, total, used, free = line.split() 106 nt = collections.namedtuple('free', 'total used free') 107 return nt(int(total), int(used), int(free)) 108 raise ValueError( 109 "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines)) 110 111 112def free_physmem(): 113 """Parse 'free' cmd and return physical memory's total, used 114 and free values. 115 """ 116 # Note: free can have 2 different formats, invalidating 'shared' 117 # and 'cached' memory which may have different positions so we 118 # do not return them. 119 # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946 120 out = sh('free -b', env={"LANG": "C.UTF-8"}) 121 lines = out.split('\n') 122 for line in lines: 123 if line.startswith('Mem'): 124 total, used, free, shared = \ 125 [int(x) for x in line.split()[1:5]] 126 nt = collections.namedtuple( 127 'free', 'total used free shared output') 128 return nt(total, used, free, shared, out) 129 raise ValueError( 130 "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines)) 131 132 133def vmstat(stat): 134 out = sh("vmstat -s", env={"LANG": "C.UTF-8"}) 135 for line in out.split("\n"): 136 line = line.strip() 137 if stat in line: 138 return int(line.split(' ')[0]) 139 raise ValueError("can't find %r in 'vmstat' output" % stat) 140 141 142def get_free_version_info(): 143 out = sh("free -V").strip() 144 return tuple(map(int, out.split()[-1].split('.'))) 145 146 147@contextlib.contextmanager 148def mock_open_content(for_path, content): 149 """Mock open() builtin and forces it to return a certain `content` 150 on read() if the path being opened matches `for_path`. 151 """ 152 def open_mock(name, *args, **kwargs): 153 if name == for_path: 154 if PY3: 155 if isinstance(content, basestring): 156 return io.StringIO(content) 157 else: 158 return io.BytesIO(content) 159 else: 160 return io.BytesIO(content) 161 else: 162 return orig_open(name, *args, **kwargs) 163 164 orig_open = open 165 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 166 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: 167 yield m 168 169 170@contextlib.contextmanager 171def mock_open_exception(for_path, exc): 172 """Mock open() builtin and raises `exc` if the path being opened 173 matches `for_path`. 174 """ 175 def open_mock(name, *args, **kwargs): 176 if name == for_path: 177 raise exc 178 else: 179 return orig_open(name, *args, **kwargs) 180 181 orig_open = open 182 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 183 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: 184 yield m 185 186 187# ===================================================================== 188# --- system virtual memory 189# ===================================================================== 190 191 192@unittest.skipIf(not LINUX, "LINUX only") 193class TestSystemVirtualMemory(unittest.TestCase): 194 195 def test_total(self): 196 # free_value = free_physmem().total 197 # psutil_value = psutil.virtual_memory().total 198 # self.assertEqual(free_value, psutil_value) 199 vmstat_value = vmstat('total memory') * 1024 200 psutil_value = psutil.virtual_memory().total 201 self.assertAlmostEqual(vmstat_value, psutil_value) 202 203 # Older versions of procps used slab memory to calculate used memory. 204 # This got changed in: 205 # https://gitlab.com/procps-ng/procps/commit/ 206 # 05d751c4f076a2f0118b914c5e51cfbb4762ad8e 207 @unittest.skipIf(LINUX and get_free_version_info() < (3, 3, 12), 208 "old free version") 209 @retry_on_failure() 210 def test_used(self): 211 free = free_physmem() 212 free_value = free.used 213 psutil_value = psutil.virtual_memory().used 214 self.assertAlmostEqual( 215 free_value, psutil_value, delta=MEMORY_TOLERANCE, 216 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) 217 218 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") 219 @retry_on_failure() 220 def test_free(self): 221 vmstat_value = vmstat('free memory') * 1024 222 psutil_value = psutil.virtual_memory().free 223 self.assertAlmostEqual( 224 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) 225 226 @retry_on_failure() 227 def test_buffers(self): 228 vmstat_value = vmstat('buffer memory') * 1024 229 psutil_value = psutil.virtual_memory().buffers 230 self.assertAlmostEqual( 231 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) 232 233 # https://travis-ci.org/giampaolo/psutil/jobs/226719664 234 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") 235 @retry_on_failure() 236 def test_active(self): 237 vmstat_value = vmstat('active memory') * 1024 238 psutil_value = psutil.virtual_memory().active 239 self.assertAlmostEqual( 240 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) 241 242 # https://travis-ci.org/giampaolo/psutil/jobs/227242952 243 @unittest.skipIf(TRAVIS, "unreliable on TRAVIS") 244 @retry_on_failure() 245 def test_inactive(self): 246 vmstat_value = vmstat('inactive memory') * 1024 247 psutil_value = psutil.virtual_memory().inactive 248 self.assertAlmostEqual( 249 vmstat_value, psutil_value, delta=MEMORY_TOLERANCE) 250 251 @retry_on_failure() 252 def test_shared(self): 253 free = free_physmem() 254 free_value = free.shared 255 if free_value == 0: 256 raise unittest.SkipTest("free does not support 'shared' column") 257 psutil_value = psutil.virtual_memory().shared 258 self.assertAlmostEqual( 259 free_value, psutil_value, delta=MEMORY_TOLERANCE, 260 msg='%s %s \n%s' % (free_value, psutil_value, free.output)) 261 262 @retry_on_failure() 263 def test_available(self): 264 # "free" output format has changed at some point: 265 # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098 266 out = sh("free -b") 267 lines = out.split('\n') 268 if 'available' not in lines[0]: 269 raise unittest.SkipTest("free does not support 'available' column") 270 else: 271 free_value = int(lines[1].split()[-1]) 272 psutil_value = psutil.virtual_memory().available 273 self.assertAlmostEqual( 274 free_value, psutil_value, delta=MEMORY_TOLERANCE, 275 msg='%s %s \n%s' % (free_value, psutil_value, out)) 276 277 def test_warnings_on_misses(self): 278 # Emulate a case where /proc/meminfo provides few info. 279 # psutil is supposed to set the missing fields to 0 and 280 # raise a warning. 281 with mock_open_content( 282 '/proc/meminfo', 283 textwrap.dedent("""\ 284 Active(anon): 6145416 kB 285 Active(file): 2950064 kB 286 Inactive(anon): 574764 kB 287 Inactive(file): 1567648 kB 288 MemAvailable: -1 kB 289 MemFree: 2057400 kB 290 MemTotal: 16325648 kB 291 SReclaimable: 346648 kB 292 """).encode()) as m: 293 with warnings.catch_warnings(record=True) as ws: 294 warnings.simplefilter("always") 295 ret = psutil.virtual_memory() 296 assert m.called 297 self.assertEqual(len(ws), 1) 298 w = ws[0] 299 assert w.filename.endswith('psutil/_pslinux.py') 300 self.assertIn( 301 "memory stats couldn't be determined", str(w.message)) 302 self.assertIn("cached", str(w.message)) 303 self.assertIn("shared", str(w.message)) 304 self.assertIn("active", str(w.message)) 305 self.assertIn("inactive", str(w.message)) 306 self.assertIn("buffers", str(w.message)) 307 self.assertIn("available", str(w.message)) 308 self.assertEqual(ret.cached, 0) 309 self.assertEqual(ret.active, 0) 310 self.assertEqual(ret.inactive, 0) 311 self.assertEqual(ret.shared, 0) 312 self.assertEqual(ret.buffers, 0) 313 self.assertEqual(ret.available, 0) 314 self.assertEqual(ret.slab, 0) 315 316 @retry_on_failure() 317 def test_avail_old_percent(self): 318 # Make sure that our calculation of avail mem for old kernels 319 # is off by max 15%. 320 from psutil._pslinux import calculate_avail_vmem 321 from psutil._pslinux import open_binary 322 323 mems = {} 324 with open_binary('/proc/meminfo') as f: 325 for line in f: 326 fields = line.split() 327 mems[fields[0]] = int(fields[1]) * 1024 328 329 a = calculate_avail_vmem(mems) 330 if b'MemAvailable:' in mems: 331 b = mems[b'MemAvailable:'] 332 diff_percent = abs(a - b) / a * 100 333 self.assertLess(diff_percent, 15) 334 335 def test_avail_old_comes_from_kernel(self): 336 # Make sure "MemAvailable:" coluimn is used instead of relying 337 # on our internal algorithm to calculate avail mem. 338 with mock_open_content( 339 '/proc/meminfo', 340 textwrap.dedent("""\ 341 Active: 9444728 kB 342 Active(anon): 6145416 kB 343 Active(file): 2950064 kB 344 Buffers: 287952 kB 345 Cached: 4818144 kB 346 Inactive(file): 1578132 kB 347 Inactive(anon): 574764 kB 348 Inactive(file): 1567648 kB 349 MemAvailable: 6574984 kB 350 MemFree: 2057400 kB 351 MemTotal: 16325648 kB 352 Shmem: 577588 kB 353 SReclaimable: 346648 kB 354 """).encode()) as m: 355 with warnings.catch_warnings(record=True) as ws: 356 ret = psutil.virtual_memory() 357 assert m.called 358 self.assertEqual(ret.available, 6574984 * 1024) 359 w = ws[0] 360 self.assertIn( 361 "inactive memory stats couldn't be determined", str(w.message)) 362 363 def test_avail_old_missing_fields(self): 364 # Remove Active(file), Inactive(file) and SReclaimable 365 # from /proc/meminfo and make sure the fallback is used 366 # (free + cached), 367 with mock_open_content( 368 "/proc/meminfo", 369 textwrap.dedent("""\ 370 Active: 9444728 kB 371 Active(anon): 6145416 kB 372 Buffers: 287952 kB 373 Cached: 4818144 kB 374 Inactive(file): 1578132 kB 375 Inactive(anon): 574764 kB 376 MemFree: 2057400 kB 377 MemTotal: 16325648 kB 378 Shmem: 577588 kB 379 """).encode()) as m: 380 with warnings.catch_warnings(record=True) as ws: 381 ret = psutil.virtual_memory() 382 assert m.called 383 self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024) 384 w = ws[0] 385 self.assertIn( 386 "inactive memory stats couldn't be determined", str(w.message)) 387 388 def test_avail_old_missing_zoneinfo(self): 389 # Remove /proc/zoneinfo file. Make sure fallback is used 390 # (free + cached). 391 with mock_open_content( 392 "/proc/meminfo", 393 textwrap.dedent("""\ 394 Active: 9444728 kB 395 Active(anon): 6145416 kB 396 Active(file): 2950064 kB 397 Buffers: 287952 kB 398 Cached: 4818144 kB 399 Inactive(file): 1578132 kB 400 Inactive(anon): 574764 kB 401 Inactive(file): 1567648 kB 402 MemFree: 2057400 kB 403 MemTotal: 16325648 kB 404 Shmem: 577588 kB 405 SReclaimable: 346648 kB 406 """).encode()): 407 with mock_open_exception( 408 "/proc/zoneinfo", 409 IOError(errno.ENOENT, 'no such file or directory')): 410 with warnings.catch_warnings(record=True) as ws: 411 ret = psutil.virtual_memory() 412 self.assertEqual( 413 ret.available, 2057400 * 1024 + 4818144 * 1024) 414 w = ws[0] 415 self.assertIn( 416 "inactive memory stats couldn't be determined", 417 str(w.message)) 418 419 def test_virtual_memory_mocked(self): 420 # Emulate /proc/meminfo because neither vmstat nor free return slab. 421 def open_mock(name, *args, **kwargs): 422 if name == '/proc/meminfo': 423 return io.BytesIO(textwrap.dedent("""\ 424 MemTotal: 100 kB 425 MemFree: 2 kB 426 MemAvailable: 3 kB 427 Buffers: 4 kB 428 Cached: 5 kB 429 SwapCached: 6 kB 430 Active: 7 kB 431 Inactive: 8 kB 432 Active(anon): 9 kB 433 Inactive(anon): 10 kB 434 Active(file): 11 kB 435 Inactive(file): 12 kB 436 Unevictable: 13 kB 437 Mlocked: 14 kB 438 SwapTotal: 15 kB 439 SwapFree: 16 kB 440 Dirty: 17 kB 441 Writeback: 18 kB 442 AnonPages: 19 kB 443 Mapped: 20 kB 444 Shmem: 21 kB 445 Slab: 22 kB 446 SReclaimable: 23 kB 447 SUnreclaim: 24 kB 448 KernelStack: 25 kB 449 PageTables: 26 kB 450 NFS_Unstable: 27 kB 451 Bounce: 28 kB 452 WritebackTmp: 29 kB 453 CommitLimit: 30 kB 454 Committed_AS: 31 kB 455 VmallocTotal: 32 kB 456 VmallocUsed: 33 kB 457 VmallocChunk: 34 kB 458 HardwareCorrupted: 35 kB 459 AnonHugePages: 36 kB 460 ShmemHugePages: 37 kB 461 ShmemPmdMapped: 38 kB 462 CmaTotal: 39 kB 463 CmaFree: 40 kB 464 HugePages_Total: 41 kB 465 HugePages_Free: 42 kB 466 HugePages_Rsvd: 43 kB 467 HugePages_Surp: 44 kB 468 Hugepagesize: 45 kB 469 DirectMap46k: 46 kB 470 DirectMap47M: 47 kB 471 DirectMap48G: 48 kB 472 """).encode()) 473 else: 474 return orig_open(name, *args, **kwargs) 475 476 orig_open = open 477 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 478 with mock.patch(patch_point, create=True, side_effect=open_mock) as m: 479 mem = psutil.virtual_memory() 480 assert m.called 481 self.assertEqual(mem.total, 100 * 1024) 482 self.assertEqual(mem.free, 2 * 1024) 483 self.assertEqual(mem.buffers, 4 * 1024) 484 # cached mem also includes reclaimable memory 485 self.assertEqual(mem.cached, (5 + 23) * 1024) 486 self.assertEqual(mem.shared, 21 * 1024) 487 self.assertEqual(mem.active, 7 * 1024) 488 self.assertEqual(mem.inactive, 8 * 1024) 489 self.assertEqual(mem.slab, 22 * 1024) 490 self.assertEqual(mem.available, 3 * 1024) 491 492 493# ===================================================================== 494# --- system swap memory 495# ===================================================================== 496 497 498@unittest.skipIf(not LINUX, "LINUX only") 499class TestSystemSwapMemory(unittest.TestCase): 500 501 @staticmethod 502 def meminfo_has_swap_info(): 503 """Return True if /proc/meminfo provides swap metrics.""" 504 with open("/proc/meminfo") as f: 505 data = f.read() 506 return 'SwapTotal:' in data and 'SwapFree:' in data 507 508 def test_total(self): 509 free_value = free_swap().total 510 psutil_value = psutil.swap_memory().total 511 return self.assertAlmostEqual( 512 free_value, psutil_value, delta=MEMORY_TOLERANCE) 513 514 @retry_on_failure() 515 def test_used(self): 516 free_value = free_swap().used 517 psutil_value = psutil.swap_memory().used 518 return self.assertAlmostEqual( 519 free_value, psutil_value, delta=MEMORY_TOLERANCE) 520 521 @retry_on_failure() 522 def test_free(self): 523 free_value = free_swap().free 524 psutil_value = psutil.swap_memory().free 525 return self.assertAlmostEqual( 526 free_value, psutil_value, delta=MEMORY_TOLERANCE) 527 528 def test_missing_sin_sout(self): 529 with mock.patch('psutil._common.open', create=True) as m: 530 with warnings.catch_warnings(record=True) as ws: 531 warnings.simplefilter("always") 532 ret = psutil.swap_memory() 533 assert m.called 534 self.assertEqual(len(ws), 1) 535 w = ws[0] 536 assert w.filename.endswith('psutil/_pslinux.py') 537 self.assertIn( 538 "'sin' and 'sout' swap memory stats couldn't " 539 "be determined", str(w.message)) 540 self.assertEqual(ret.sin, 0) 541 self.assertEqual(ret.sout, 0) 542 543 def test_no_vmstat_mocked(self): 544 # see https://github.com/giampaolo/psutil/issues/722 545 with mock_open_exception( 546 "/proc/vmstat", 547 IOError(errno.ENOENT, 'no such file or directory')) as m: 548 with warnings.catch_warnings(record=True) as ws: 549 warnings.simplefilter("always") 550 ret = psutil.swap_memory() 551 assert m.called 552 self.assertEqual(len(ws), 1) 553 w = ws[0] 554 assert w.filename.endswith('psutil/_pslinux.py') 555 self.assertIn( 556 "'sin' and 'sout' swap memory stats couldn't " 557 "be determined and were set to 0", 558 str(w.message)) 559 self.assertEqual(ret.sin, 0) 560 self.assertEqual(ret.sout, 0) 561 562 def test_meminfo_against_sysinfo(self): 563 # Make sure the content of /proc/meminfo about swap memory 564 # matches sysinfo() syscall, see: 565 # https://github.com/giampaolo/psutil/issues/1015 566 if not self.meminfo_has_swap_info(): 567 return unittest.skip("/proc/meminfo has no swap metrics") 568 with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m: 569 swap = psutil.swap_memory() 570 assert not m.called 571 import psutil._psutil_linux as cext 572 _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo() 573 total *= unit_multiplier 574 free *= unit_multiplier 575 self.assertEqual(swap.total, total) 576 self.assertAlmostEqual(swap.free, free, delta=MEMORY_TOLERANCE) 577 578 def test_emulate_meminfo_has_no_metrics(self): 579 # Emulate a case where /proc/meminfo provides no swap metrics 580 # in which case sysinfo() syscall is supposed to be used 581 # as a fallback. 582 with mock_open_content("/proc/meminfo", b"") as m: 583 psutil.swap_memory() 584 assert m.called 585 586 587# ===================================================================== 588# --- system CPU 589# ===================================================================== 590 591 592@unittest.skipIf(not LINUX, "LINUX only") 593class TestSystemCPUTimes(unittest.TestCase): 594 595 @unittest.skipIf(TRAVIS, "unknown failure on travis") 596 def test_fields(self): 597 fields = psutil.cpu_times()._fields 598 kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0] 599 kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) 600 if kernel_ver_info >= (2, 6, 11): 601 self.assertIn('steal', fields) 602 else: 603 self.assertNotIn('steal', fields) 604 if kernel_ver_info >= (2, 6, 24): 605 self.assertIn('guest', fields) 606 else: 607 self.assertNotIn('guest', fields) 608 if kernel_ver_info >= (3, 2, 0): 609 self.assertIn('guest_nice', fields) 610 else: 611 self.assertNotIn('guest_nice', fields) 612 613 614@unittest.skipIf(not LINUX, "LINUX only") 615class TestSystemCPUCountLogical(unittest.TestCase): 616 617 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"), 618 "/sys/devices/system/cpu/online does not exist") 619 def test_against_sysdev_cpu_online(self): 620 with open("/sys/devices/system/cpu/online") as f: 621 value = f.read().strip() 622 if "-" in str(value): 623 value = int(value.split('-')[1]) + 1 624 self.assertEqual(psutil.cpu_count(), value) 625 626 @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"), 627 "/sys/devices/system/cpu does not exist") 628 def test_against_sysdev_cpu_num(self): 629 ls = os.listdir("/sys/devices/system/cpu") 630 count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None]) 631 self.assertEqual(psutil.cpu_count(), count) 632 633 @unittest.skipIf(not which("nproc"), "nproc utility not available") 634 def test_against_nproc(self): 635 num = int(sh("nproc --all")) 636 self.assertEqual(psutil.cpu_count(logical=True), num) 637 638 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") 639 def test_against_lscpu(self): 640 out = sh("lscpu -p") 641 num = len([x for x in out.split('\n') if not x.startswith('#')]) 642 self.assertEqual(psutil.cpu_count(logical=True), num) 643 644 def test_emulate_fallbacks(self): 645 import psutil._pslinux 646 original = psutil._pslinux.cpu_count_logical() 647 # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in 648 # order to cause the parsing of /proc/cpuinfo and /proc/stat. 649 with mock.patch( 650 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: 651 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) 652 assert m.called 653 654 # Let's have open() return emtpy data and make sure None is 655 # returned ('cause we mimick os.cpu_count()). 656 with mock.patch('psutil._common.open', create=True) as m: 657 self.assertIsNone(psutil._pslinux.cpu_count_logical()) 658 self.assertEqual(m.call_count, 2) 659 # /proc/stat should be the last one 660 self.assertEqual(m.call_args[0][0], '/proc/stat') 661 662 # Let's push this a bit further and make sure /proc/cpuinfo 663 # parsing works as expected. 664 with open('/proc/cpuinfo', 'rb') as f: 665 cpuinfo_data = f.read() 666 fake_file = io.BytesIO(cpuinfo_data) 667 with mock.patch('psutil._common.open', 668 return_value=fake_file, create=True) as m: 669 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) 670 671 # Finally, let's make /proc/cpuinfo return meaningless data; 672 # this way we'll fall back on relying on /proc/stat 673 with mock_open_content('/proc/cpuinfo', b"") as m: 674 self.assertEqual(psutil._pslinux.cpu_count_logical(), original) 675 m.called 676 677 678@unittest.skipIf(not LINUX, "LINUX only") 679class TestSystemCPUCountPhysical(unittest.TestCase): 680 681 @unittest.skipIf(not which("lscpu"), "lscpu utility not available") 682 def test_against_lscpu(self): 683 out = sh("lscpu -p") 684 core_ids = set() 685 for line in out.split('\n'): 686 if not line.startswith('#'): 687 fields = line.split(',') 688 core_ids.add(fields[1]) 689 self.assertEqual(psutil.cpu_count(logical=False), len(core_ids)) 690 691 def test_emulate_none(self): 692 with mock.patch('glob.glob', return_value=[]) as m1: 693 with mock.patch('psutil._common.open', create=True) as m2: 694 self.assertIsNone(psutil._pslinux.cpu_count_physical()) 695 assert m1.called 696 assert m2.called 697 698 699@unittest.skipIf(not LINUX, "LINUX only") 700class TestSystemCPUFrequency(unittest.TestCase): 701 702 @unittest.skipIf(TRAVIS, "fails on Travis") 703 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 704 def test_emulate_use_second_file(self): 705 # https://github.com/giampaolo/psutil/issues/981 706 def path_exists_mock(path): 707 if path.startswith("/sys/devices/system/cpu/cpufreq/policy"): 708 return False 709 else: 710 return orig_exists(path) 711 712 orig_exists = os.path.exists 713 with mock.patch("os.path.exists", side_effect=path_exists_mock, 714 create=True): 715 assert psutil.cpu_freq() 716 717 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 718 def test_emulate_use_cpuinfo(self): 719 # Emulate a case where /sys/devices/system/cpu/cpufreq* does not 720 # exist and /proc/cpuinfo is used instead. 721 def path_exists_mock(path): 722 if path.startswith('/sys/devices/system/cpu/'): 723 return False 724 else: 725 if path == "/proc/cpuinfo": 726 flags.append(None) 727 return os_path_exists(path) 728 729 flags = [] 730 os_path_exists = os.path.exists 731 try: 732 with mock.patch("os.path.exists", side_effect=path_exists_mock): 733 reload_module(psutil._pslinux) 734 ret = psutil.cpu_freq() 735 assert ret 736 assert flags 737 self.assertEqual(ret.max, 0.0) 738 self.assertEqual(ret.min, 0.0) 739 for freq in psutil.cpu_freq(percpu=True): 740 self.assertEqual(ret.max, 0.0) 741 self.assertEqual(ret.min, 0.0) 742 finally: 743 reload_module(psutil._pslinux) 744 reload_module(psutil) 745 746 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 747 def test_emulate_data(self): 748 def open_mock(name, *args, **kwargs): 749 if (name.endswith('/scaling_cur_freq') and 750 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): 751 return io.BytesIO(b"500000") 752 elif (name.endswith('/scaling_min_freq') and 753 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): 754 return io.BytesIO(b"600000") 755 elif (name.endswith('/scaling_max_freq') and 756 name.startswith("/sys/devices/system/cpu/cpufreq/policy")): 757 return io.BytesIO(b"700000") 758 elif name == '/proc/cpuinfo': 759 return io.BytesIO(b"cpu MHz : 500") 760 else: 761 return orig_open(name, *args, **kwargs) 762 763 orig_open = open 764 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 765 with mock.patch(patch_point, side_effect=open_mock): 766 with mock.patch( 767 'os.path.exists', return_value=True): 768 freq = psutil.cpu_freq() 769 self.assertEqual(freq.current, 500.0) 770 # when /proc/cpuinfo is used min and max frequencies are not 771 # available and are set to 0. 772 if freq.min != 0.0: 773 self.assertEqual(freq.min, 600.0) 774 if freq.max != 0.0: 775 self.assertEqual(freq.max, 700.0) 776 777 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 778 def test_emulate_multi_cpu(self): 779 def open_mock(name, *args, **kwargs): 780 n = name 781 if (n.endswith('/scaling_cur_freq') and 782 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): 783 return io.BytesIO(b"100000") 784 elif (n.endswith('/scaling_min_freq') and 785 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): 786 return io.BytesIO(b"200000") 787 elif (n.endswith('/scaling_max_freq') and 788 n.startswith("/sys/devices/system/cpu/cpufreq/policy0")): 789 return io.BytesIO(b"300000") 790 elif (n.endswith('/scaling_cur_freq') and 791 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): 792 return io.BytesIO(b"400000") 793 elif (n.endswith('/scaling_min_freq') and 794 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): 795 return io.BytesIO(b"500000") 796 elif (n.endswith('/scaling_max_freq') and 797 n.startswith("/sys/devices/system/cpu/cpufreq/policy1")): 798 return io.BytesIO(b"600000") 799 elif name == '/proc/cpuinfo': 800 return io.BytesIO(b"cpu MHz : 100\n" 801 b"cpu MHz : 400") 802 else: 803 return orig_open(name, *args, **kwargs) 804 805 orig_open = open 806 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 807 with mock.patch(patch_point, side_effect=open_mock): 808 with mock.patch('os.path.exists', return_value=True): 809 with mock.patch('psutil._pslinux.cpu_count_logical', 810 return_value=2): 811 freq = psutil.cpu_freq(percpu=True) 812 self.assertEqual(freq[0].current, 100.0) 813 if freq[0].min != 0.0: 814 self.assertEqual(freq[0].min, 200.0) 815 if freq[0].max != 0.0: 816 self.assertEqual(freq[0].max, 300.0) 817 self.assertEqual(freq[1].current, 400.0) 818 if freq[1].min != 0.0: 819 self.assertEqual(freq[1].min, 500.0) 820 if freq[1].max != 0.0: 821 self.assertEqual(freq[1].max, 600.0) 822 823 @unittest.skipIf(TRAVIS, "fails on Travis") 824 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 825 def test_emulate_no_scaling_cur_freq_file(self): 826 # See: https://github.com/giampaolo/psutil/issues/1071 827 def open_mock(name, *args, **kwargs): 828 if name.endswith('/scaling_cur_freq'): 829 raise IOError(errno.ENOENT, "") 830 elif name.endswith('/cpuinfo_cur_freq'): 831 return io.BytesIO(b"200000") 832 elif name == '/proc/cpuinfo': 833 return io.BytesIO(b"cpu MHz : 200") 834 else: 835 return orig_open(name, *args, **kwargs) 836 837 orig_open = open 838 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 839 with mock.patch(patch_point, side_effect=open_mock): 840 with mock.patch('os.path.exists', return_value=True): 841 with mock.patch('psutil._pslinux.cpu_count_logical', 842 return_value=1): 843 freq = psutil.cpu_freq() 844 self.assertEqual(freq.current, 200) 845 846 847@unittest.skipIf(not LINUX, "LINUX only") 848class TestSystemCPUStats(unittest.TestCase): 849 850 @unittest.skipIf(TRAVIS, "fails on Travis") 851 def test_ctx_switches(self): 852 vmstat_value = vmstat("context switches") 853 psutil_value = psutil.cpu_stats().ctx_switches 854 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) 855 856 @unittest.skipIf(TRAVIS, "fails on Travis") 857 def test_interrupts(self): 858 vmstat_value = vmstat("interrupts") 859 psutil_value = psutil.cpu_stats().interrupts 860 self.assertAlmostEqual(vmstat_value, psutil_value, delta=500) 861 862 863@unittest.skipIf(not LINUX, "LINUX only") 864class TestLoadAvg(unittest.TestCase): 865 866 @unittest.skipIf(not HAS_GETLOADAVG, "not supported") 867 def test_getloadavg(self): 868 psutil_value = psutil.getloadavg() 869 with open("/proc/loadavg", "r") as f: 870 proc_value = f.read().split() 871 872 self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1) 873 self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1) 874 self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1) 875 876 877# ===================================================================== 878# --- system network 879# ===================================================================== 880 881 882@unittest.skipIf(not LINUX, "LINUX only") 883class TestSystemNetIfAddrs(unittest.TestCase): 884 885 def test_ips(self): 886 for name, addrs in psutil.net_if_addrs().items(): 887 for addr in addrs: 888 if addr.family == psutil.AF_LINK: 889 self.assertEqual(addr.address, get_mac_address(name)) 890 elif addr.family == socket.AF_INET: 891 self.assertEqual(addr.address, get_ipv4_address(name)) 892 # TODO: test for AF_INET6 family 893 894 # XXX - not reliable when having virtual NICs installed by Docker. 895 # @unittest.skipIf(not which('ip'), "'ip' utility not available") 896 # @unittest.skipIf(TRAVIS, "skipped on Travis") 897 # def test_net_if_names(self): 898 # out = sh("ip addr").strip() 899 # nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x] 900 # found = 0 901 # for line in out.split('\n'): 902 # line = line.strip() 903 # if re.search(r"^\d+:", line): 904 # found += 1 905 # name = line.split(':')[1].strip() 906 # self.assertIn(name, nics) 907 # self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( 908 # pprint.pformat(nics), out)) 909 910 911@unittest.skipIf(not LINUX, "LINUX only") 912class TestSystemNetIfStats(unittest.TestCase): 913 914 def test_against_ifconfig(self): 915 for name, stats in psutil.net_if_stats().items(): 916 try: 917 out = sh("ifconfig %s" % name) 918 except RuntimeError: 919 pass 920 else: 921 # Not always reliable. 922 # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out) 923 self.assertEqual(stats.mtu, 924 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0])) 925 926 927@unittest.skipIf(not LINUX, "LINUX only") 928class TestSystemNetIOCounters(unittest.TestCase): 929 930 @retry_on_failure() 931 def test_against_ifconfig(self): 932 def ifconfig(nic): 933 ret = {} 934 out = sh("ifconfig %s" % name) 935 ret['packets_recv'] = int( 936 re.findall(r'RX packets[: ](\d+)', out)[0]) 937 ret['packets_sent'] = int( 938 re.findall(r'TX packets[: ](\d+)', out)[0]) 939 ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0]) 940 ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1]) 941 ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0]) 942 ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1]) 943 ret['bytes_recv'] = int( 944 re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) 945 ret['bytes_sent'] = int( 946 re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0]) 947 return ret 948 949 nio = psutil.net_io_counters(pernic=True, nowrap=False) 950 for name, stats in nio.items(): 951 try: 952 ifconfig_ret = ifconfig(name) 953 except RuntimeError: 954 continue 955 self.assertAlmostEqual( 956 stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5) 957 self.assertAlmostEqual( 958 stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5) 959 self.assertAlmostEqual( 960 stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024) 961 self.assertAlmostEqual( 962 stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024) 963 self.assertAlmostEqual( 964 stats.errin, ifconfig_ret['errin'], delta=10) 965 self.assertAlmostEqual( 966 stats.errout, ifconfig_ret['errout'], delta=10) 967 self.assertAlmostEqual( 968 stats.dropin, ifconfig_ret['dropin'], delta=10) 969 self.assertAlmostEqual( 970 stats.dropout, ifconfig_ret['dropout'], delta=10) 971 972 973@unittest.skipIf(not LINUX, "LINUX only") 974class TestSystemNetConnections(unittest.TestCase): 975 976 @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError) 977 @mock.patch('psutil._pslinux.supports_ipv6', return_value=False) 978 def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop): 979 # see: https://github.com/giampaolo/psutil/issues/623 980 try: 981 s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) 982 self.addCleanup(s.close) 983 s.bind(("::1", 0)) 984 except socket.error: 985 pass 986 psutil.net_connections(kind='inet6') 987 988 def test_emulate_unix(self): 989 with mock_open_content( 990 '/proc/net/unix', 991 textwrap.dedent("""\ 992 0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n 993 0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ 994 0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O 995 000000000000000000000000000000000000000000000000000000 996 """)) as m: 997 psutil.net_connections(kind='unix') 998 assert m.called 999 1000 1001# ===================================================================== 1002# --- system disks 1003# ===================================================================== 1004 1005 1006@unittest.skipIf(not LINUX, "LINUX only") 1007class TestSystemDiskPartitions(unittest.TestCase): 1008 1009 @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available") 1010 @skip_on_not_implemented() 1011 def test_against_df(self): 1012 # test psutil.disk_usage() and psutil.disk_partitions() 1013 # against "df -a" 1014 def df(path): 1015 out = sh('df -P -B 1 "%s"' % path).strip() 1016 lines = out.split('\n') 1017 lines.pop(0) 1018 line = lines.pop(0) 1019 dev, total, used, free = line.split()[:4] 1020 if dev == 'none': 1021 dev = '' 1022 total, used, free = int(total), int(used), int(free) 1023 return dev, total, used, free 1024 1025 for part in psutil.disk_partitions(all=False): 1026 usage = psutil.disk_usage(part.mountpoint) 1027 dev, total, used, free = df(part.mountpoint) 1028 self.assertEqual(usage.total, total) 1029 # 10 MB tollerance 1030 if abs(usage.free - free) > 10 * 1024 * 1024: 1031 self.fail("psutil=%s, df=%s" % (usage.free, free)) 1032 if abs(usage.used - used) > 10 * 1024 * 1024: 1033 self.fail("psutil=%s, df=%s" % (usage.used, used)) 1034 1035 def test_zfs_fs(self): 1036 # Test that ZFS partitions are returned. 1037 with open("/proc/filesystems", "r") as f: 1038 data = f.read() 1039 if 'zfs' in data: 1040 for part in psutil.disk_partitions(): 1041 if part.fstype == 'zfs': 1042 break 1043 else: 1044 self.fail("couldn't find any ZFS partition") 1045 else: 1046 # No ZFS partitions on this system. Let's fake one. 1047 fake_file = io.StringIO(u("nodev\tzfs\n")) 1048 with mock.patch('psutil._common.open', 1049 return_value=fake_file, create=True) as m1: 1050 with mock.patch( 1051 'psutil._pslinux.cext.disk_partitions', 1052 return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: 1053 ret = psutil.disk_partitions() 1054 assert m1.called 1055 assert m2.called 1056 assert ret 1057 self.assertEqual(ret[0].fstype, 'zfs') 1058 1059 def test_emulate_realpath_fail(self): 1060 # See: https://github.com/giampaolo/psutil/issues/1307 1061 try: 1062 with mock.patch('os.path.realpath', 1063 return_value='/non/existent') as m: 1064 with self.assertRaises(FileNotFoundError): 1065 psutil.disk_partitions() 1066 assert m.called 1067 finally: 1068 psutil.PROCFS_PATH = "/proc" 1069 1070 1071@unittest.skipIf(not LINUX, "LINUX only") 1072class TestSystemDiskIoCounters(unittest.TestCase): 1073 1074 def test_emulate_kernel_2_4(self): 1075 # Tests /proc/diskstats parsing format for 2.4 kernels, see: 1076 # https://github.com/giampaolo/psutil/issues/767 1077 with mock_open_content( 1078 '/proc/diskstats', 1079 " 3 0 1 hda 2 3 4 5 6 7 8 9 10 11 12"): 1080 with mock.patch('psutil._pslinux.is_storage_device', 1081 return_value=True): 1082 ret = psutil.disk_io_counters(nowrap=False) 1083 self.assertEqual(ret.read_count, 1) 1084 self.assertEqual(ret.read_merged_count, 2) 1085 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) 1086 self.assertEqual(ret.read_time, 4) 1087 self.assertEqual(ret.write_count, 5) 1088 self.assertEqual(ret.write_merged_count, 6) 1089 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) 1090 self.assertEqual(ret.write_time, 8) 1091 self.assertEqual(ret.busy_time, 10) 1092 1093 def test_emulate_kernel_2_6_full(self): 1094 # Tests /proc/diskstats parsing format for 2.6 kernels, 1095 # lines reporting all metrics: 1096 # https://github.com/giampaolo/psutil/issues/767 1097 with mock_open_content( 1098 '/proc/diskstats', 1099 " 3 0 hda 1 2 3 4 5 6 7 8 9 10 11"): 1100 with mock.patch('psutil._pslinux.is_storage_device', 1101 return_value=True): 1102 ret = psutil.disk_io_counters(nowrap=False) 1103 self.assertEqual(ret.read_count, 1) 1104 self.assertEqual(ret.read_merged_count, 2) 1105 self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE) 1106 self.assertEqual(ret.read_time, 4) 1107 self.assertEqual(ret.write_count, 5) 1108 self.assertEqual(ret.write_merged_count, 6) 1109 self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE) 1110 self.assertEqual(ret.write_time, 8) 1111 self.assertEqual(ret.busy_time, 10) 1112 1113 def test_emulate_kernel_2_6_limited(self): 1114 # Tests /proc/diskstats parsing format for 2.6 kernels, 1115 # where one line of /proc/partitions return a limited 1116 # amount of metrics when it bumps into a partition 1117 # (instead of a disk). See: 1118 # https://github.com/giampaolo/psutil/issues/767 1119 with mock_open_content( 1120 '/proc/diskstats', 1121 " 3 1 hda 1 2 3 4"): 1122 with mock.patch('psutil._pslinux.is_storage_device', 1123 return_value=True): 1124 ret = psutil.disk_io_counters(nowrap=False) 1125 self.assertEqual(ret.read_count, 1) 1126 self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE) 1127 self.assertEqual(ret.write_count, 3) 1128 self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE) 1129 1130 self.assertEqual(ret.read_merged_count, 0) 1131 self.assertEqual(ret.read_time, 0) 1132 self.assertEqual(ret.write_merged_count, 0) 1133 self.assertEqual(ret.write_time, 0) 1134 self.assertEqual(ret.busy_time, 0) 1135 1136 def test_emulate_include_partitions(self): 1137 # Make sure that when perdisk=True disk partitions are returned, 1138 # see: 1139 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 1140 with mock_open_content( 1141 '/proc/diskstats', 1142 textwrap.dedent("""\ 1143 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 1144 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 1145 """)): 1146 with mock.patch('psutil._pslinux.is_storage_device', 1147 return_value=False): 1148 ret = psutil.disk_io_counters(perdisk=True, nowrap=False) 1149 self.assertEqual(len(ret), 2) 1150 self.assertEqual(ret['nvme0n1'].read_count, 1) 1151 self.assertEqual(ret['nvme0n1p1'].read_count, 1) 1152 self.assertEqual(ret['nvme0n1'].write_count, 5) 1153 self.assertEqual(ret['nvme0n1p1'].write_count, 5) 1154 1155 def test_emulate_exclude_partitions(self): 1156 # Make sure that when perdisk=False partitions (e.g. 'sda1', 1157 # 'nvme0n1p1') are skipped and not included in the total count. 1158 # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842 1159 with mock_open_content( 1160 '/proc/diskstats', 1161 textwrap.dedent("""\ 1162 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 1163 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 1164 """)): 1165 with mock.patch('psutil._pslinux.is_storage_device', 1166 return_value=False): 1167 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) 1168 self.assertIsNone(ret) 1169 1170 # 1171 def is_storage_device(name): 1172 return name == 'nvme0n1' 1173 1174 with mock_open_content( 1175 '/proc/diskstats', 1176 textwrap.dedent("""\ 1177 3 0 nvme0n1 1 2 3 4 5 6 7 8 9 10 11 1178 3 0 nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11 1179 """)): 1180 with mock.patch('psutil._pslinux.is_storage_device', 1181 create=True, side_effect=is_storage_device): 1182 ret = psutil.disk_io_counters(perdisk=False, nowrap=False) 1183 self.assertEqual(ret.read_count, 1) 1184 self.assertEqual(ret.write_count, 5) 1185 1186 def test_emulate_use_sysfs(self): 1187 def exists(path): 1188 if path == '/proc/diskstats': 1189 return False 1190 return True 1191 1192 wprocfs = psutil.disk_io_counters(perdisk=True) 1193 with mock.patch('psutil._pslinux.os.path.exists', 1194 create=True, side_effect=exists): 1195 wsysfs = psutil.disk_io_counters(perdisk=True) 1196 self.assertEqual(len(wprocfs), len(wsysfs)) 1197 1198 def test_emulate_not_impl(self): 1199 def exists(path): 1200 return False 1201 1202 with mock.patch('psutil._pslinux.os.path.exists', 1203 create=True, side_effect=exists): 1204 self.assertRaises(NotImplementedError, psutil.disk_io_counters) 1205 1206 1207# ===================================================================== 1208# --- misc 1209# ===================================================================== 1210 1211 1212@unittest.skipIf(not LINUX, "LINUX only") 1213class TestMisc(unittest.TestCase): 1214 1215 def test_boot_time(self): 1216 vmstat_value = vmstat('boot time') 1217 psutil_value = psutil.boot_time() 1218 self.assertEqual(int(vmstat_value), int(psutil_value)) 1219 1220 def test_no_procfs_on_import(self): 1221 my_procfs = tempfile.mkdtemp() 1222 1223 with open(os.path.join(my_procfs, 'stat'), 'w') as f: 1224 f.write('cpu 0 0 0 0 0 0 0 0 0 0\n') 1225 f.write('cpu0 0 0 0 0 0 0 0 0 0 0\n') 1226 f.write('cpu1 0 0 0 0 0 0 0 0 0 0\n') 1227 1228 try: 1229 orig_open = open 1230 1231 def open_mock(name, *args, **kwargs): 1232 if name.startswith('/proc'): 1233 raise IOError(errno.ENOENT, 'rejecting access for test') 1234 return orig_open(name, *args, **kwargs) 1235 1236 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1237 with mock.patch(patch_point, side_effect=open_mock): 1238 reload_module(psutil) 1239 1240 self.assertRaises(IOError, psutil.cpu_times) 1241 self.assertRaises(IOError, psutil.cpu_times, percpu=True) 1242 self.assertRaises(IOError, psutil.cpu_percent) 1243 self.assertRaises(IOError, psutil.cpu_percent, percpu=True) 1244 self.assertRaises(IOError, psutil.cpu_times_percent) 1245 self.assertRaises( 1246 IOError, psutil.cpu_times_percent, percpu=True) 1247 1248 psutil.PROCFS_PATH = my_procfs 1249 1250 self.assertEqual(psutil.cpu_percent(), 0) 1251 self.assertEqual(sum(psutil.cpu_times_percent()), 0) 1252 1253 # since we don't know the number of CPUs at import time, 1254 # we awkwardly say there are none until the second call 1255 per_cpu_percent = psutil.cpu_percent(percpu=True) 1256 self.assertEqual(sum(per_cpu_percent), 0) 1257 1258 # ditto awkward length 1259 per_cpu_times_percent = psutil.cpu_times_percent(percpu=True) 1260 self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0) 1261 1262 # much user, very busy 1263 with open(os.path.join(my_procfs, 'stat'), 'w') as f: 1264 f.write('cpu 1 0 0 0 0 0 0 0 0 0\n') 1265 f.write('cpu0 1 0 0 0 0 0 0 0 0 0\n') 1266 f.write('cpu1 1 0 0 0 0 0 0 0 0 0\n') 1267 1268 self.assertNotEqual(psutil.cpu_percent(), 0) 1269 self.assertNotEqual( 1270 sum(psutil.cpu_percent(percpu=True)), 0) 1271 self.assertNotEqual(sum(psutil.cpu_times_percent()), 0) 1272 self.assertNotEqual( 1273 sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0) 1274 finally: 1275 shutil.rmtree(my_procfs) 1276 reload_module(psutil) 1277 1278 self.assertEqual(psutil.PROCFS_PATH, '/proc') 1279 1280 def test_cpu_steal_decrease(self): 1281 # Test cumulative cpu stats decrease. We should ignore this. 1282 # See issue #1210. 1283 with mock_open_content( 1284 "/proc/stat", 1285 textwrap.dedent("""\ 1286 cpu 0 0 0 0 0 0 0 1 0 0 1287 cpu0 0 0 0 0 0 0 0 1 0 0 1288 cpu1 0 0 0 0 0 0 0 1 0 0 1289 """).encode()) as m: 1290 # first call to "percent" functions should read the new stat file 1291 # and compare to the "real" file read at import time - so the 1292 # values are meaningless 1293 psutil.cpu_percent() 1294 assert m.called 1295 psutil.cpu_percent(percpu=True) 1296 psutil.cpu_times_percent() 1297 psutil.cpu_times_percent(percpu=True) 1298 1299 with mock_open_content( 1300 "/proc/stat", 1301 textwrap.dedent("""\ 1302 cpu 1 0 0 0 0 0 0 0 0 0 1303 cpu0 1 0 0 0 0 0 0 0 0 0 1304 cpu1 1 0 0 0 0 0 0 0 0 0 1305 """).encode()) as m: 1306 # Increase "user" while steal goes "backwards" to zero. 1307 cpu_percent = psutil.cpu_percent() 1308 assert m.called 1309 cpu_percent_percpu = psutil.cpu_percent(percpu=True) 1310 cpu_times_percent = psutil.cpu_times_percent() 1311 cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True) 1312 self.assertNotEqual(cpu_percent, 0) 1313 self.assertNotEqual(sum(cpu_percent_percpu), 0) 1314 self.assertNotEqual(sum(cpu_times_percent), 0) 1315 self.assertNotEqual(sum(cpu_times_percent), 100.0) 1316 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0) 1317 self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0) 1318 self.assertEqual(cpu_times_percent.steal, 0) 1319 self.assertNotEqual(cpu_times_percent.user, 0) 1320 1321 def test_boot_time_mocked(self): 1322 with mock.patch('psutil._common.open', create=True) as m: 1323 self.assertRaises( 1324 RuntimeError, 1325 psutil._pslinux.boot_time) 1326 assert m.called 1327 1328 def test_users_mocked(self): 1329 # Make sure ':0' and ':0.0' (returned by C ext) are converted 1330 # to 'localhost'. 1331 with mock.patch('psutil._pslinux.cext.users', 1332 return_value=[('giampaolo', 'pts/2', ':0', 1333 1436573184.0, True, 2)]) as m: 1334 self.assertEqual(psutil.users()[0].host, 'localhost') 1335 assert m.called 1336 with mock.patch('psutil._pslinux.cext.users', 1337 return_value=[('giampaolo', 'pts/2', ':0.0', 1338 1436573184.0, True, 2)]) as m: 1339 self.assertEqual(psutil.users()[0].host, 'localhost') 1340 assert m.called 1341 # ...otherwise it should be returned as-is 1342 with mock.patch('psutil._pslinux.cext.users', 1343 return_value=[('giampaolo', 'pts/2', 'foo', 1344 1436573184.0, True, 2)]) as m: 1345 self.assertEqual(psutil.users()[0].host, 'foo') 1346 assert m.called 1347 1348 def test_procfs_path(self): 1349 tdir = tempfile.mkdtemp() 1350 try: 1351 psutil.PROCFS_PATH = tdir 1352 self.assertRaises(IOError, psutil.virtual_memory) 1353 self.assertRaises(IOError, psutil.cpu_times) 1354 self.assertRaises(IOError, psutil.cpu_times, percpu=True) 1355 self.assertRaises(IOError, psutil.boot_time) 1356 # self.assertRaises(IOError, psutil.pids) 1357 self.assertRaises(IOError, psutil.net_connections) 1358 self.assertRaises(IOError, psutil.net_io_counters) 1359 self.assertRaises(IOError, psutil.net_if_stats) 1360 # self.assertRaises(IOError, psutil.disk_io_counters) 1361 self.assertRaises(IOError, psutil.disk_partitions) 1362 self.assertRaises(psutil.NoSuchProcess, psutil.Process) 1363 finally: 1364 psutil.PROCFS_PATH = "/proc" 1365 os.rmdir(tdir) 1366 1367 def test_issue_687(self): 1368 # In case of thread ID: 1369 # - pid_exists() is supposed to return False 1370 # - Process(tid) is supposed to work 1371 # - pids() should not return the TID 1372 # See: https://github.com/giampaolo/psutil/issues/687 1373 t = ThreadTask() 1374 t.start() 1375 try: 1376 p = psutil.Process() 1377 tid = p.threads()[1].id 1378 assert not psutil.pid_exists(tid), tid 1379 pt = psutil.Process(tid) 1380 pt.as_dict() 1381 self.assertNotIn(tid, psutil.pids()) 1382 finally: 1383 t.stop() 1384 1385 def test_pid_exists_no_proc_status(self): 1386 # Internally pid_exists relies on /proc/{pid}/status. 1387 # Emulate a case where this file is empty in which case 1388 # psutil is supposed to fall back on using pids(). 1389 with mock_open_content("/proc/%s/status", "") as m: 1390 assert psutil.pid_exists(os.getpid()) 1391 assert m.called 1392 1393 1394# ===================================================================== 1395# --- sensors 1396# ===================================================================== 1397 1398 1399@unittest.skipIf(not LINUX, "LINUX only") 1400@unittest.skipIf(not HAS_BATTERY, "no battery") 1401class TestSensorsBattery(unittest.TestCase): 1402 1403 @unittest.skipIf(not which("acpi"), "acpi utility not available") 1404 def test_percent(self): 1405 out = sh("acpi -b") 1406 acpi_value = int(out.split(",")[1].strip().replace('%', '')) 1407 psutil_value = psutil.sensors_battery().percent 1408 self.assertAlmostEqual(acpi_value, psutil_value, delta=1) 1409 1410 @unittest.skipIf(not which("acpi"), "acpi utility not available") 1411 def test_power_plugged(self): 1412 out = sh("acpi -b") 1413 if 'unknown' in out.lower(): 1414 return unittest.skip("acpi output not reliable") 1415 if 'discharging at zero rate' in out: 1416 plugged = True 1417 else: 1418 plugged = "Charging" in out.split('\n')[0] 1419 self.assertEqual(psutil.sensors_battery().power_plugged, plugged) 1420 1421 def test_emulate_power_plugged(self): 1422 # Pretend the AC power cable is connected. 1423 def open_mock(name, *args, **kwargs): 1424 if name.endswith("AC0/online") or name.endswith("AC/online"): 1425 return io.BytesIO(b"1") 1426 else: 1427 return orig_open(name, *args, **kwargs) 1428 1429 orig_open = open 1430 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1431 with mock.patch(patch_point, side_effect=open_mock) as m: 1432 self.assertEqual(psutil.sensors_battery().power_plugged, True) 1433 self.assertEqual( 1434 psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED) 1435 assert m.called 1436 1437 def test_emulate_power_plugged_2(self): 1438 # Same as above but pretend /AC0/online does not exist in which 1439 # case code relies on /status file. 1440 def open_mock(name, *args, **kwargs): 1441 if name.endswith("AC0/online") or name.endswith("AC/online"): 1442 raise IOError(errno.ENOENT, "") 1443 elif name.endswith("/status"): 1444 return io.StringIO(u("charging")) 1445 else: 1446 return orig_open(name, *args, **kwargs) 1447 1448 orig_open = open 1449 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1450 with mock.patch(patch_point, side_effect=open_mock) as m: 1451 self.assertEqual(psutil.sensors_battery().power_plugged, True) 1452 assert m.called 1453 1454 def test_emulate_power_not_plugged(self): 1455 # Pretend the AC power cable is not connected. 1456 def open_mock(name, *args, **kwargs): 1457 if name.endswith("AC0/online") or name.endswith("AC/online"): 1458 return io.BytesIO(b"0") 1459 else: 1460 return orig_open(name, *args, **kwargs) 1461 1462 orig_open = open 1463 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1464 with mock.patch(patch_point, side_effect=open_mock) as m: 1465 self.assertEqual(psutil.sensors_battery().power_plugged, False) 1466 assert m.called 1467 1468 def test_emulate_power_not_plugged_2(self): 1469 # Same as above but pretend /AC0/online does not exist in which 1470 # case code relies on /status file. 1471 def open_mock(name, *args, **kwargs): 1472 if name.endswith("AC0/online") or name.endswith("AC/online"): 1473 raise IOError(errno.ENOENT, "") 1474 elif name.endswith("/status"): 1475 return io.StringIO(u("discharging")) 1476 else: 1477 return orig_open(name, *args, **kwargs) 1478 1479 orig_open = open 1480 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1481 with mock.patch(patch_point, side_effect=open_mock) as m: 1482 self.assertEqual(psutil.sensors_battery().power_plugged, False) 1483 assert m.called 1484 1485 def test_emulate_power_undetermined(self): 1486 # Pretend we can't know whether the AC power cable not 1487 # connected (assert fallback to False). 1488 def open_mock(name, *args, **kwargs): 1489 if name.startswith("/sys/class/power_supply/AC0/online") or \ 1490 name.startswith("/sys/class/power_supply/AC/online"): 1491 raise IOError(errno.ENOENT, "") 1492 elif name.startswith("/sys/class/power_supply/BAT0/status"): 1493 return io.BytesIO(b"???") 1494 else: 1495 return orig_open(name, *args, **kwargs) 1496 1497 orig_open = open 1498 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1499 with mock.patch(patch_point, side_effect=open_mock) as m: 1500 self.assertIsNone(psutil.sensors_battery().power_plugged) 1501 assert m.called 1502 1503 def test_emulate_no_base_files(self): 1504 # Emulate a case where base metrics files are not present, 1505 # in which case we're supposed to get None. 1506 with mock_open_exception( 1507 "/sys/class/power_supply/BAT0/energy_now", 1508 IOError(errno.ENOENT, "")): 1509 with mock_open_exception( 1510 "/sys/class/power_supply/BAT0/charge_now", 1511 IOError(errno.ENOENT, "")): 1512 self.assertIsNone(psutil.sensors_battery()) 1513 1514 def test_emulate_energy_full_0(self): 1515 # Emulate a case where energy_full files returns 0. 1516 with mock_open_content( 1517 "/sys/class/power_supply/BAT0/energy_full", b"0") as m: 1518 self.assertEqual(psutil.sensors_battery().percent, 0) 1519 assert m.called 1520 1521 def test_emulate_energy_full_not_avail(self): 1522 # Emulate a case where energy_full file does not exist. 1523 # Expected fallback on /capacity. 1524 with mock_open_exception( 1525 "/sys/class/power_supply/BAT0/energy_full", 1526 IOError(errno.ENOENT, "")): 1527 with mock_open_exception( 1528 "/sys/class/power_supply/BAT0/charge_full", 1529 IOError(errno.ENOENT, "")): 1530 with mock_open_content( 1531 "/sys/class/power_supply/BAT0/capacity", b"88"): 1532 self.assertEqual(psutil.sensors_battery().percent, 88) 1533 1534 def test_emulate_no_power(self): 1535 # Emulate a case where /AC0/online file nor /BAT0/status exist. 1536 with mock_open_exception( 1537 "/sys/class/power_supply/AC/online", 1538 IOError(errno.ENOENT, "")): 1539 with mock_open_exception( 1540 "/sys/class/power_supply/AC0/online", 1541 IOError(errno.ENOENT, "")): 1542 with mock_open_exception( 1543 "/sys/class/power_supply/BAT0/status", 1544 IOError(errno.ENOENT, "")): 1545 self.assertIsNone(psutil.sensors_battery().power_plugged) 1546 1547 1548@unittest.skipIf(not LINUX, "LINUX only") 1549class TestSensorsTemperatures(unittest.TestCase): 1550 1551 def test_emulate_class_hwmon(self): 1552 def open_mock(name, *args, **kwargs): 1553 if name.endswith('/name'): 1554 return io.StringIO(u("name")) 1555 elif name.endswith('/temp1_label'): 1556 return io.StringIO(u("label")) 1557 elif name.endswith('/temp1_input'): 1558 return io.BytesIO(b"30000") 1559 elif name.endswith('/temp1_max'): 1560 return io.BytesIO(b"40000") 1561 elif name.endswith('/temp1_crit'): 1562 return io.BytesIO(b"50000") 1563 else: 1564 return orig_open(name, *args, **kwargs) 1565 1566 orig_open = open 1567 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1568 with mock.patch(patch_point, side_effect=open_mock): 1569 # Test case with /sys/class/hwmon 1570 with mock.patch('glob.glob', 1571 return_value=['/sys/class/hwmon/hwmon0/temp1']): 1572 temp = psutil.sensors_temperatures()['name'][0] 1573 self.assertEqual(temp.label, 'label') 1574 self.assertEqual(temp.current, 30.0) 1575 self.assertEqual(temp.high, 40.0) 1576 self.assertEqual(temp.critical, 50.0) 1577 1578 def test_emulate_class_thermal(self): 1579 def open_mock(name, *args, **kwargs): 1580 if name.endswith('0_temp'): 1581 return io.BytesIO(b"50000") 1582 elif name.endswith('temp'): 1583 return io.BytesIO(b"30000") 1584 elif name.endswith('0_type'): 1585 return io.StringIO(u("critical")) 1586 elif name.endswith('type'): 1587 return io.StringIO(u("name")) 1588 else: 1589 return orig_open(name, *args, **kwargs) 1590 1591 def glob_mock(path): 1592 if path == '/sys/class/hwmon/hwmon*/temp*_*': 1593 return [] 1594 elif path == '/sys/class/hwmon/hwmon*/device/temp*_*': 1595 return [] 1596 elif path == '/sys/class/thermal/thermal_zone*': 1597 return ['/sys/class/thermal/thermal_zone0'] 1598 elif path == '/sys/class/thermal/thermal_zone0/trip_point*': 1599 return ['/sys/class/thermal/thermal_zone1/trip_point_0_type', 1600 '/sys/class/thermal/thermal_zone1/trip_point_0_temp'] 1601 return [] 1602 1603 orig_open = open 1604 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1605 with mock.patch(patch_point, side_effect=open_mock): 1606 with mock.patch('glob.glob', create=True, side_effect=glob_mock): 1607 temp = psutil.sensors_temperatures()['name'][0] 1608 self.assertEqual(temp.label, '') 1609 self.assertEqual(temp.current, 30.0) 1610 self.assertEqual(temp.high, 50.0) 1611 self.assertEqual(temp.critical, 50.0) 1612 1613 1614@unittest.skipIf(not LINUX, "LINUX only") 1615class TestSensorsFans(unittest.TestCase): 1616 1617 def test_emulate_data(self): 1618 def open_mock(name, *args, **kwargs): 1619 if name.endswith('/name'): 1620 return io.StringIO(u("name")) 1621 elif name.endswith('/fan1_label'): 1622 return io.StringIO(u("label")) 1623 elif name.endswith('/fan1_input'): 1624 return io.StringIO(u("2000")) 1625 else: 1626 return orig_open(name, *args, **kwargs) 1627 1628 orig_open = open 1629 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1630 with mock.patch(patch_point, side_effect=open_mock): 1631 with mock.patch('glob.glob', 1632 return_value=['/sys/class/hwmon/hwmon2/fan1']): 1633 fan = psutil.sensors_fans()['name'][0] 1634 self.assertEqual(fan.label, 'label') 1635 self.assertEqual(fan.current, 2000) 1636 1637 1638# ===================================================================== 1639# --- test process 1640# ===================================================================== 1641 1642 1643@unittest.skipIf(not LINUX, "LINUX only") 1644class TestProcess(unittest.TestCase): 1645 1646 def setUp(self): 1647 safe_rmpath(TESTFN) 1648 1649 tearDown = setUp 1650 1651 def test_memory_full_info(self): 1652 src = textwrap.dedent(""" 1653 import time 1654 with open("%s", "w") as f: 1655 time.sleep(10) 1656 """ % TESTFN) 1657 sproc = pyrun(src) 1658 self.addCleanup(reap_children) 1659 call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN) 1660 p = psutil.Process(sproc.pid) 1661 time.sleep(.1) 1662 mem = p.memory_full_info() 1663 maps = p.memory_maps(grouped=False) 1664 self.assertAlmostEqual( 1665 mem.uss, sum([x.private_dirty + x.private_clean for x in maps]), 1666 delta=4096) 1667 self.assertAlmostEqual( 1668 mem.pss, sum([x.pss for x in maps]), delta=4096) 1669 self.assertAlmostEqual( 1670 mem.swap, sum([x.swap for x in maps]), delta=4096) 1671 1672 def test_memory_full_info_mocked(self): 1673 # See: https://github.com/giampaolo/psutil/issues/1222 1674 with mock_open_content( 1675 "/proc/%s/smaps" % os.getpid(), 1676 textwrap.dedent("""\ 1677 fffff0 r-xp 00000000 00:00 0 [vsyscall] 1678 Size: 1 kB 1679 Rss: 2 kB 1680 Pss: 3 kB 1681 Shared_Clean: 4 kB 1682 Shared_Dirty: 5 kB 1683 Private_Clean: 6 kB 1684 Private_Dirty: 7 kB 1685 Referenced: 8 kB 1686 Anonymous: 9 kB 1687 LazyFree: 10 kB 1688 AnonHugePages: 11 kB 1689 ShmemPmdMapped: 12 kB 1690 Shared_Hugetlb: 13 kB 1691 Private_Hugetlb: 14 kB 1692 Swap: 15 kB 1693 SwapPss: 16 kB 1694 KernelPageSize: 17 kB 1695 MMUPageSize: 18 kB 1696 Locked: 19 kB 1697 VmFlags: rd ex 1698 """).encode()) as m: 1699 p = psutil.Process() 1700 mem = p.memory_full_info() 1701 assert m.called 1702 self.assertEqual(mem.uss, (6 + 7 + 14) * 1024) 1703 self.assertEqual(mem.pss, 3 * 1024) 1704 self.assertEqual(mem.swap, 15 * 1024) 1705 1706 # On PYPY file descriptors are not closed fast enough. 1707 @unittest.skipIf(PYPY, "unreliable on PYPY") 1708 def test_open_files_mode(self): 1709 def get_test_file(): 1710 p = psutil.Process() 1711 giveup_at = time.time() + 2 1712 while True: 1713 for file in p.open_files(): 1714 if file.path == os.path.abspath(TESTFN): 1715 return file 1716 elif time.time() > giveup_at: 1717 break 1718 raise RuntimeError("timeout looking for test file") 1719 1720 # 1721 with open(TESTFN, "w"): 1722 self.assertEqual(get_test_file().mode, "w") 1723 with open(TESTFN, "r"): 1724 self.assertEqual(get_test_file().mode, "r") 1725 with open(TESTFN, "a"): 1726 self.assertEqual(get_test_file().mode, "a") 1727 # 1728 with open(TESTFN, "r+"): 1729 self.assertEqual(get_test_file().mode, "r+") 1730 with open(TESTFN, "w+"): 1731 self.assertEqual(get_test_file().mode, "r+") 1732 with open(TESTFN, "a+"): 1733 self.assertEqual(get_test_file().mode, "a+") 1734 # note: "x" bit is not supported 1735 if PY3: 1736 safe_rmpath(TESTFN) 1737 with open(TESTFN, "x"): 1738 self.assertEqual(get_test_file().mode, "w") 1739 safe_rmpath(TESTFN) 1740 with open(TESTFN, "x+"): 1741 self.assertEqual(get_test_file().mode, "r+") 1742 1743 def test_open_files_file_gone(self): 1744 # simulates a file which gets deleted during open_files() 1745 # execution 1746 p = psutil.Process() 1747 files = p.open_files() 1748 with tempfile.NamedTemporaryFile(): 1749 # give the kernel some time to see the new file 1750 call_until(p.open_files, "len(ret) != %i" % len(files)) 1751 with mock.patch('psutil._pslinux.os.readlink', 1752 side_effect=OSError(errno.ENOENT, "")) as m: 1753 files = p.open_files() 1754 assert not files 1755 assert m.called 1756 # also simulate the case where os.readlink() returns EINVAL 1757 # in which case psutil is supposed to 'continue' 1758 with mock.patch('psutil._pslinux.os.readlink', 1759 side_effect=OSError(errno.EINVAL, "")) as m: 1760 self.assertEqual(p.open_files(), []) 1761 assert m.called 1762 1763 def test_open_files_fd_gone(self): 1764 # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears 1765 # while iterating through fds. 1766 # https://travis-ci.org/giampaolo/psutil/jobs/225694530 1767 p = psutil.Process() 1768 files = p.open_files() 1769 with tempfile.NamedTemporaryFile(): 1770 # give the kernel some time to see the new file 1771 call_until(p.open_files, "len(ret) != %i" % len(files)) 1772 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1773 with mock.patch(patch_point, 1774 side_effect=IOError(errno.ENOENT, "")) as m: 1775 files = p.open_files() 1776 assert not files 1777 assert m.called 1778 1779 # --- mocked tests 1780 1781 def test_terminal_mocked(self): 1782 with mock.patch('psutil._pslinux._psposix.get_terminal_map', 1783 return_value={}) as m: 1784 self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) 1785 assert m.called 1786 1787 # TODO: re-enable this test. 1788 # def test_num_ctx_switches_mocked(self): 1789 # with mock.patch('psutil._common.open', create=True) as m: 1790 # self.assertRaises( 1791 # NotImplementedError, 1792 # psutil._pslinux.Process(os.getpid()).num_ctx_switches) 1793 # assert m.called 1794 1795 def test_cmdline_mocked(self): 1796 # see: https://github.com/giampaolo/psutil/issues/639 1797 p = psutil.Process() 1798 fake_file = io.StringIO(u('foo\x00bar\x00')) 1799 with mock.patch('psutil._common.open', 1800 return_value=fake_file, create=True) as m: 1801 self.assertEqual(p.cmdline(), ['foo', 'bar']) 1802 assert m.called 1803 fake_file = io.StringIO(u('foo\x00bar\x00\x00')) 1804 with mock.patch('psutil._common.open', 1805 return_value=fake_file, create=True) as m: 1806 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) 1807 assert m.called 1808 1809 def test_cmdline_spaces_mocked(self): 1810 # see: https://github.com/giampaolo/psutil/issues/1179 1811 p = psutil.Process() 1812 fake_file = io.StringIO(u('foo bar ')) 1813 with mock.patch('psutil._common.open', 1814 return_value=fake_file, create=True) as m: 1815 self.assertEqual(p.cmdline(), ['foo', 'bar']) 1816 assert m.called 1817 fake_file = io.StringIO(u('foo bar ')) 1818 with mock.patch('psutil._common.open', 1819 return_value=fake_file, create=True) as m: 1820 self.assertEqual(p.cmdline(), ['foo', 'bar', '']) 1821 assert m.called 1822 1823 def test_cmdline_mixed_separators(self): 1824 # https://github.com/giampaolo/psutil/issues/ 1825 # 1179#issuecomment-552984549 1826 p = psutil.Process() 1827 fake_file = io.StringIO(u('foo\x20bar\x00')) 1828 with mock.patch('psutil._common.open', 1829 return_value=fake_file, create=True) as m: 1830 self.assertEqual(p.cmdline(), ['foo', 'bar']) 1831 assert m.called 1832 1833 def test_readlink_path_deleted_mocked(self): 1834 with mock.patch('psutil._pslinux.os.readlink', 1835 return_value='/home/foo (deleted)'): 1836 self.assertEqual(psutil.Process().exe(), "/home/foo") 1837 self.assertEqual(psutil.Process().cwd(), "/home/foo") 1838 1839 def test_threads_mocked(self): 1840 # Test the case where os.listdir() returns a file (thread) 1841 # which no longer exists by the time we open() it (race 1842 # condition). threads() is supposed to ignore that instead 1843 # of raising NSP. 1844 def open_mock(name, *args, **kwargs): 1845 if name.startswith('/proc/%s/task' % os.getpid()): 1846 raise IOError(errno.ENOENT, "") 1847 else: 1848 return orig_open(name, *args, **kwargs) 1849 1850 orig_open = open 1851 patch_point = 'builtins.open' if PY3 else '__builtin__.open' 1852 with mock.patch(patch_point, side_effect=open_mock) as m: 1853 ret = psutil.Process().threads() 1854 assert m.called 1855 self.assertEqual(ret, []) 1856 1857 # ...but if it bumps into something != ENOENT we want an 1858 # exception. 1859 def open_mock(name, *args, **kwargs): 1860 if name.startswith('/proc/%s/task' % os.getpid()): 1861 raise IOError(errno.EPERM, "") 1862 else: 1863 return orig_open(name, *args, **kwargs) 1864 1865 with mock.patch(patch_point, side_effect=open_mock): 1866 self.assertRaises(psutil.AccessDenied, psutil.Process().threads) 1867 1868 def test_exe_mocked(self): 1869 with mock.patch('psutil._pslinux.readlink', 1870 side_effect=OSError(errno.ENOENT, "")) as m1: 1871 with mock.patch('psutil.Process.cmdline', 1872 side_effect=psutil.AccessDenied(0, "")) as m2: 1873 # No such file error; might be raised also if /proc/pid/exe 1874 # path actually exists for system processes with low pids 1875 # (about 0-20). In this case psutil is supposed to return 1876 # an empty string. 1877 ret = psutil.Process().exe() 1878 assert m1.called 1879 assert m2.called 1880 self.assertEqual(ret, "") 1881 1882 # ...but if /proc/pid no longer exist we're supposed to treat 1883 # it as an alias for zombie process 1884 with mock.patch('psutil._pslinux.os.path.lexists', 1885 return_value=False): 1886 self.assertRaises( 1887 psutil.ZombieProcess, psutil.Process().exe) 1888 1889 def test_issue_1014(self): 1890 # Emulates a case where smaps file does not exist. In this case 1891 # wrap_exception decorator should not raise NoSuchProcess. 1892 with mock_open_exception( 1893 '/proc/%s/smaps' % os.getpid(), 1894 IOError(errno.ENOENT, "")) as m: 1895 p = psutil.Process() 1896 with self.assertRaises(FileNotFoundError): 1897 p.memory_maps() 1898 assert m.called 1899 1900 @unittest.skipIf(not HAS_RLIMIT, "not supported") 1901 def test_rlimit_zombie(self): 1902 # Emulate a case where rlimit() raises ENOSYS, which may 1903 # happen in case of zombie process: 1904 # https://travis-ci.org/giampaolo/psutil/jobs/51368273 1905 with mock.patch("psutil._pslinux.cext.linux_prlimit", 1906 side_effect=OSError(errno.ENOSYS, "")) as m: 1907 p = psutil.Process() 1908 p.name() 1909 with self.assertRaises(psutil.ZombieProcess) as exc: 1910 p.rlimit(psutil.RLIMIT_NOFILE) 1911 assert m.called 1912 self.assertEqual(exc.exception.pid, p.pid) 1913 self.assertEqual(exc.exception.name, p.name()) 1914 1915 def test_cwd_zombie(self): 1916 with mock.patch("psutil._pslinux.os.readlink", 1917 side_effect=OSError(errno.ENOENT, "")) as m: 1918 p = psutil.Process() 1919 p.name() 1920 with self.assertRaises(psutil.ZombieProcess) as exc: 1921 p.cwd() 1922 assert m.called 1923 self.assertEqual(exc.exception.pid, p.pid) 1924 self.assertEqual(exc.exception.name, p.name()) 1925 1926 def test_stat_file_parsing(self): 1927 from psutil._pslinux import CLOCK_TICKS 1928 1929 args = [ 1930 "0", # pid 1931 "(cat)", # name 1932 "Z", # status 1933 "1", # ppid 1934 "0", # pgrp 1935 "0", # session 1936 "0", # tty 1937 "0", # tpgid 1938 "0", # flags 1939 "0", # minflt 1940 "0", # cminflt 1941 "0", # majflt 1942 "0", # cmajflt 1943 "2", # utime 1944 "3", # stime 1945 "4", # cutime 1946 "5", # cstime 1947 "0", # priority 1948 "0", # nice 1949 "0", # num_threads 1950 "0", # itrealvalue 1951 "6", # starttime 1952 "0", # vsize 1953 "0", # rss 1954 "0", # rsslim 1955 "0", # startcode 1956 "0", # endcode 1957 "0", # startstack 1958 "0", # kstkesp 1959 "0", # kstkeip 1960 "0", # signal 1961 "0", # blocked 1962 "0", # sigignore 1963 "0", # sigcatch 1964 "0", # wchan 1965 "0", # nswap 1966 "0", # cnswap 1967 "0", # exit_signal 1968 "6", # processor 1969 "0", # rt priority 1970 "0", # policy 1971 "7", # delayacct_blkio_ticks 1972 ] 1973 content = " ".join(args).encode() 1974 with mock_open_content('/proc/%s/stat' % os.getpid(), content): 1975 p = psutil.Process() 1976 self.assertEqual(p.name(), 'cat') 1977 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) 1978 self.assertEqual(p.ppid(), 1) 1979 self.assertEqual( 1980 p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time()) 1981 cpu = p.cpu_times() 1982 self.assertEqual(cpu.user, 2 / CLOCK_TICKS) 1983 self.assertEqual(cpu.system, 3 / CLOCK_TICKS) 1984 self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS) 1985 self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS) 1986 self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS) 1987 self.assertEqual(p.cpu_num(), 6) 1988 1989 def test_status_file_parsing(self): 1990 with mock_open_content( 1991 '/proc/%s/status' % os.getpid(), 1992 textwrap.dedent("""\ 1993 Uid:\t1000\t1001\t1002\t1003 1994 Gid:\t1004\t1005\t1006\t1007 1995 Threads:\t66 1996 Cpus_allowed:\tf 1997 Cpus_allowed_list:\t0-7 1998 voluntary_ctxt_switches:\t12 1999 nonvoluntary_ctxt_switches:\t13""").encode()): 2000 p = psutil.Process() 2001 self.assertEqual(p.num_ctx_switches().voluntary, 12) 2002 self.assertEqual(p.num_ctx_switches().involuntary, 13) 2003 self.assertEqual(p.num_threads(), 66) 2004 uids = p.uids() 2005 self.assertEqual(uids.real, 1000) 2006 self.assertEqual(uids.effective, 1001) 2007 self.assertEqual(uids.saved, 1002) 2008 gids = p.gids() 2009 self.assertEqual(gids.real, 1004) 2010 self.assertEqual(gids.effective, 1005) 2011 self.assertEqual(gids.saved, 1006) 2012 self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8))) 2013 2014 2015@unittest.skipIf(not LINUX, "LINUX only") 2016class TestProcessAgainstStatus(unittest.TestCase): 2017 """/proc/pid/stat and /proc/pid/status have many values in common. 2018 Whenever possible, psutil uses /proc/pid/stat (it's faster). 2019 For all those cases we check that the value found in 2020 /proc/pid/stat (by psutil) matches the one found in 2021 /proc/pid/status. 2022 """ 2023 2024 @classmethod 2025 def setUpClass(cls): 2026 cls.proc = psutil.Process() 2027 2028 def read_status_file(self, linestart): 2029 with psutil._psplatform.open_text( 2030 '/proc/%s/status' % self.proc.pid) as f: 2031 for line in f: 2032 line = line.strip() 2033 if line.startswith(linestart): 2034 value = line.partition('\t')[2] 2035 try: 2036 return int(value) 2037 except ValueError: 2038 return value 2039 raise ValueError("can't find %r" % linestart) 2040 2041 def test_name(self): 2042 value = self.read_status_file("Name:") 2043 self.assertEqual(self.proc.name(), value) 2044 2045 def test_status(self): 2046 value = self.read_status_file("State:") 2047 value = value[value.find('(') + 1:value.rfind(')')] 2048 value = value.replace(' ', '-') 2049 self.assertEqual(self.proc.status(), value) 2050 2051 def test_ppid(self): 2052 value = self.read_status_file("PPid:") 2053 self.assertEqual(self.proc.ppid(), value) 2054 2055 def test_num_threads(self): 2056 value = self.read_status_file("Threads:") 2057 self.assertEqual(self.proc.num_threads(), value) 2058 2059 def test_uids(self): 2060 value = self.read_status_file("Uid:") 2061 value = tuple(map(int, value.split()[1:4])) 2062 self.assertEqual(self.proc.uids(), value) 2063 2064 def test_gids(self): 2065 value = self.read_status_file("Gid:") 2066 value = tuple(map(int, value.split()[1:4])) 2067 self.assertEqual(self.proc.gids(), value) 2068 2069 @retry_on_failure() 2070 def test_num_ctx_switches(self): 2071 value = self.read_status_file("voluntary_ctxt_switches:") 2072 self.assertEqual(self.proc.num_ctx_switches().voluntary, value) 2073 value = self.read_status_file("nonvoluntary_ctxt_switches:") 2074 self.assertEqual(self.proc.num_ctx_switches().involuntary, value) 2075 2076 def test_cpu_affinity(self): 2077 value = self.read_status_file("Cpus_allowed_list:") 2078 if '-' in str(value): 2079 min_, max_ = map(int, value.split('-')) 2080 self.assertEqual( 2081 self.proc.cpu_affinity(), list(range(min_, max_ + 1))) 2082 2083 def test_cpu_affinity_eligible_cpus(self): 2084 value = self.read_status_file("Cpus_allowed_list:") 2085 with mock.patch("psutil._pslinux.per_cpu_times") as m: 2086 self.proc._proc._get_eligible_cpus() 2087 if '-' in str(value): 2088 assert not m.called 2089 else: 2090 assert m.called 2091 2092 2093# ===================================================================== 2094# --- test utils 2095# ===================================================================== 2096 2097 2098@unittest.skipIf(not LINUX, "LINUX only") 2099class TestUtils(unittest.TestCase): 2100 2101 def test_readlink(self): 2102 with mock.patch("os.readlink", return_value="foo (deleted)") as m: 2103 self.assertEqual(psutil._psplatform.readlink("bar"), "foo") 2104 assert m.called 2105 2106 def test_cat(self): 2107 fname = os.path.abspath(TESTFN) 2108 with open(fname, "wt") as f: 2109 f.write("foo ") 2110 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo") 2111 self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo") 2112 self.assertEqual( 2113 psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar") 2114 2115 2116if __name__ == '__main__': 2117 from psutil.tests.runner import run 2118 run(__file__) 2119