1#!/usr/bin/env python3 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7"""Tests for psutil.Process class.""" 8 9import collections 10import errno 11import getpass 12import itertools 13import os 14import signal 15import socket 16import stat 17import subprocess 18import sys 19import textwrap 20import time 21import types 22 23import psutil 24 25from psutil import AIX 26from psutil import BSD 27from psutil import LINUX 28from psutil import MACOS 29from psutil import NETBSD 30from psutil import OPENBSD 31from psutil import OSX 32from psutil import POSIX 33from psutil import SUNOS 34from psutil import WINDOWS 35from psutil._common import open_text 36from psutil._compat import FileNotFoundError 37from psutil._compat import long 38from psutil._compat import PY3 39from psutil._compat import super 40from psutil.tests import APPVEYOR 41from psutil.tests import call_until 42from psutil.tests import CI_TESTING 43from psutil.tests import copyload_shared_lib 44from psutil.tests import create_exe 45from psutil.tests import GITHUB_ACTIONS 46from psutil.tests import GLOBAL_TIMEOUT 47from psutil.tests import HAS_CPU_AFFINITY 48from psutil.tests import HAS_ENVIRON 49from psutil.tests import HAS_IONICE 50from psutil.tests import HAS_MEMORY_MAPS 51from psutil.tests import HAS_PROC_CPU_NUM 52from psutil.tests import HAS_PROC_IO_COUNTERS 53from psutil.tests import HAS_RLIMIT 54from psutil.tests import HAS_THREADS 55from psutil.tests import mock 56from psutil.tests import process_namespace 57from psutil.tests import PsutilTestCase 58from psutil.tests import PYPY 59from psutil.tests import PYTHON_EXE 60from psutil.tests import reap_children 61from psutil.tests import retry_on_failure 62from psutil.tests import sh 63from psutil.tests import skip_on_access_denied 64from psutil.tests import skip_on_not_implemented 65from psutil.tests import ThreadTask 66from psutil.tests import unittest 67from psutil.tests import wait_for_pid 68 69 70# =================================================================== 71# --- psutil.Process class tests 72# =================================================================== 73 74 75class TestProcess(PsutilTestCase): 76 """Tests for psutil.Process class.""" 77 78 def spawn_psproc(self, *args, **kwargs): 79 sproc = self.spawn_testproc(*args, **kwargs) 80 return psutil.Process(sproc.pid) 81 82 # --- 83 84 def test_pid(self): 85 p = psutil.Process() 86 self.assertEqual(p.pid, os.getpid()) 87 with self.assertRaises(AttributeError): 88 p.pid = 33 89 90 def test_kill(self): 91 p = self.spawn_psproc() 92 p.kill() 93 code = p.wait() 94 if WINDOWS: 95 self.assertEqual(code, signal.SIGTERM) 96 else: 97 self.assertEqual(code, -signal.SIGKILL) 98 self.assertProcessGone(p) 99 100 def test_terminate(self): 101 p = self.spawn_psproc() 102 p.terminate() 103 code = p.wait() 104 if WINDOWS: 105 self.assertEqual(code, signal.SIGTERM) 106 else: 107 self.assertEqual(code, -signal.SIGTERM) 108 self.assertProcessGone(p) 109 110 def test_send_signal(self): 111 sig = signal.SIGKILL if POSIX else signal.SIGTERM 112 p = self.spawn_psproc() 113 p.send_signal(sig) 114 code = p.wait() 115 if WINDOWS: 116 self.assertEqual(code, sig) 117 else: 118 self.assertEqual(code, -sig) 119 self.assertProcessGone(p) 120 121 @unittest.skipIf(not POSIX, "not POSIX") 122 def test_send_signal_mocked(self): 123 sig = signal.SIGTERM 124 p = self.spawn_psproc() 125 with mock.patch('psutil.os.kill', 126 side_effect=OSError(errno.ESRCH, "")): 127 self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig) 128 129 p = self.spawn_psproc() 130 with mock.patch('psutil.os.kill', 131 side_effect=OSError(errno.EPERM, "")): 132 self.assertRaises(psutil.AccessDenied, p.send_signal, sig) 133 134 def test_wait_exited(self): 135 # Test waitpid() + WIFEXITED -> WEXITSTATUS. 136 # normal return, same as exit(0) 137 cmd = [PYTHON_EXE, "-c", "pass"] 138 p = self.spawn_psproc(cmd) 139 code = p.wait() 140 self.assertEqual(code, 0) 141 self.assertProcessGone(p) 142 # exit(1), implicit in case of error 143 cmd = [PYTHON_EXE, "-c", "1 / 0"] 144 p = self.spawn_psproc(cmd, stderr=subprocess.PIPE) 145 code = p.wait() 146 self.assertEqual(code, 1) 147 self.assertProcessGone(p) 148 # via sys.exit() 149 cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"] 150 p = self.spawn_psproc(cmd) 151 code = p.wait() 152 self.assertEqual(code, 5) 153 self.assertProcessGone(p) 154 # via os._exit() 155 cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"] 156 p = self.spawn_psproc(cmd) 157 code = p.wait() 158 self.assertEqual(code, 5) 159 self.assertProcessGone(p) 160 161 def test_wait_stopped(self): 162 p = self.spawn_psproc() 163 if POSIX: 164 # Test waitpid() + WIFSTOPPED and WIFCONTINUED. 165 # Note: if a process is stopped it ignores SIGTERM. 166 p.send_signal(signal.SIGSTOP) 167 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) 168 p.send_signal(signal.SIGCONT) 169 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) 170 p.send_signal(signal.SIGTERM) 171 self.assertEqual(p.wait(), -signal.SIGTERM) 172 self.assertEqual(p.wait(), -signal.SIGTERM) 173 else: 174 p.suspend() 175 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) 176 p.resume() 177 self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001) 178 p.terminate() 179 self.assertEqual(p.wait(), signal.SIGTERM) 180 self.assertEqual(p.wait(), signal.SIGTERM) 181 182 def test_wait_non_children(self): 183 # Test wait() against a process which is not our direct 184 # child. 185 child, grandchild = self.spawn_children_pair() 186 self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01) 187 self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01) 188 # We also terminate the direct child otherwise the 189 # grandchild will hang until the parent is gone. 190 child.terminate() 191 grandchild.terminate() 192 child_ret = child.wait() 193 grandchild_ret = grandchild.wait() 194 if POSIX: 195 self.assertEqual(child_ret, -signal.SIGTERM) 196 # For processes which are not our children we're supposed 197 # to get None. 198 self.assertEqual(grandchild_ret, None) 199 else: 200 self.assertEqual(child_ret, signal.SIGTERM) 201 self.assertEqual(child_ret, signal.SIGTERM) 202 203 def test_wait_timeout(self): 204 p = self.spawn_psproc() 205 p.name() 206 self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) 207 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) 208 self.assertRaises(ValueError, p.wait, -1) 209 210 def test_wait_timeout_nonblocking(self): 211 p = self.spawn_psproc() 212 self.assertRaises(psutil.TimeoutExpired, p.wait, 0) 213 p.kill() 214 stop_at = time.time() + GLOBAL_TIMEOUT 215 while time.time() < stop_at: 216 try: 217 code = p.wait(0) 218 break 219 except psutil.TimeoutExpired: 220 pass 221 else: 222 raise self.fail('timeout') 223 if POSIX: 224 self.assertEqual(code, -signal.SIGKILL) 225 else: 226 self.assertEqual(code, signal.SIGTERM) 227 self.assertProcessGone(p) 228 229 def test_cpu_percent(self): 230 p = psutil.Process() 231 p.cpu_percent(interval=0.001) 232 p.cpu_percent(interval=0.001) 233 for x in range(100): 234 percent = p.cpu_percent(interval=None) 235 self.assertIsInstance(percent, float) 236 self.assertGreaterEqual(percent, 0.0) 237 with self.assertRaises(ValueError): 238 p.cpu_percent(interval=-1) 239 240 def test_cpu_percent_numcpus_none(self): 241 # See: https://github.com/giampaolo/psutil/issues/1087 242 with mock.patch('psutil.cpu_count', return_value=None) as m: 243 psutil.Process().cpu_percent() 244 assert m.called 245 246 def test_cpu_times(self): 247 times = psutil.Process().cpu_times() 248 assert (times.user > 0.0) or (times.system > 0.0), times 249 assert (times.children_user >= 0.0), times 250 assert (times.children_system >= 0.0), times 251 if LINUX: 252 assert times.iowait >= 0.0, times 253 # make sure returned values can be pretty printed with strftime 254 for name in times._fields: 255 time.strftime("%H:%M:%S", time.localtime(getattr(times, name))) 256 257 def test_cpu_times_2(self): 258 user_time, kernel_time = psutil.Process().cpu_times()[:2] 259 utime, ktime = os.times()[:2] 260 261 # Use os.times()[:2] as base values to compare our results 262 # using a tolerance of +/- 0.1 seconds. 263 # It will fail if the difference between the values is > 0.1s. 264 if (max([user_time, utime]) - min([user_time, utime])) > 0.1: 265 self.fail("expected: %s, found: %s" % (utime, user_time)) 266 267 if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: 268 self.fail("expected: %s, found: %s" % (ktime, kernel_time)) 269 270 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") 271 def test_cpu_num(self): 272 p = psutil.Process() 273 num = p.cpu_num() 274 self.assertGreaterEqual(num, 0) 275 if psutil.cpu_count() == 1: 276 self.assertEqual(num, 0) 277 self.assertIn(p.cpu_num(), range(psutil.cpu_count())) 278 279 def test_create_time(self): 280 p = self.spawn_psproc() 281 now = time.time() 282 create_time = p.create_time() 283 284 # Use time.time() as base value to compare our result using a 285 # tolerance of +/- 1 second. 286 # It will fail if the difference between the values is > 2s. 287 difference = abs(create_time - now) 288 if difference > 2: 289 self.fail("expected: %s, found: %s, difference: %s" 290 % (now, create_time, difference)) 291 292 # make sure returned value can be pretty printed with strftime 293 time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) 294 295 @unittest.skipIf(not POSIX, 'POSIX only') 296 def test_terminal(self): 297 terminal = psutil.Process().terminal() 298 if terminal is not None: 299 tty = os.path.realpath(sh('tty')) 300 self.assertEqual(terminal, tty) 301 302 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported') 303 @skip_on_not_implemented(only_if=LINUX) 304 def test_io_counters(self): 305 p = psutil.Process() 306 # test reads 307 io1 = p.io_counters() 308 with open(PYTHON_EXE, 'rb') as f: 309 f.read() 310 io2 = p.io_counters() 311 if not BSD and not AIX: 312 self.assertGreater(io2.read_count, io1.read_count) 313 self.assertEqual(io2.write_count, io1.write_count) 314 if LINUX: 315 self.assertGreater(io2.read_chars, io1.read_chars) 316 self.assertEqual(io2.write_chars, io1.write_chars) 317 else: 318 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) 319 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) 320 321 # test writes 322 io1 = p.io_counters() 323 with open(self.get_testfn(), 'wb') as f: 324 if PY3: 325 f.write(bytes("x" * 1000000, 'ascii')) 326 else: 327 f.write("x" * 1000000) 328 io2 = p.io_counters() 329 self.assertGreaterEqual(io2.write_count, io1.write_count) 330 self.assertGreaterEqual(io2.write_bytes, io1.write_bytes) 331 self.assertGreaterEqual(io2.read_count, io1.read_count) 332 self.assertGreaterEqual(io2.read_bytes, io1.read_bytes) 333 if LINUX: 334 self.assertGreater(io2.write_chars, io1.write_chars) 335 self.assertGreaterEqual(io2.read_chars, io1.read_chars) 336 337 # sanity check 338 for i in range(len(io2)): 339 if BSD and i >= 2: 340 # On BSD read_bytes and write_bytes are always set to -1. 341 continue 342 self.assertGreaterEqual(io2[i], 0) 343 self.assertGreaterEqual(io2[i], 0) 344 345 @unittest.skipIf(not HAS_IONICE, "not supported") 346 @unittest.skipIf(not LINUX, "linux only") 347 def test_ionice_linux(self): 348 p = psutil.Process() 349 if not CI_TESTING: 350 self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE) 351 self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0) 352 self.assertEqual(psutil.IOPRIO_CLASS_RT, 1) # high 353 self.assertEqual(psutil.IOPRIO_CLASS_BE, 2) # normal 354 self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3) # low 355 init = p.ionice() 356 try: 357 # low 358 p.ionice(psutil.IOPRIO_CLASS_IDLE) 359 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0)) 360 with self.assertRaises(ValueError): # accepts no value 361 p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7) 362 # normal 363 p.ionice(psutil.IOPRIO_CLASS_BE) 364 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0)) 365 p.ionice(psutil.IOPRIO_CLASS_BE, value=7) 366 self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7)) 367 with self.assertRaises(ValueError): 368 p.ionice(psutil.IOPRIO_CLASS_BE, value=8) 369 try: 370 p.ionice(psutil.IOPRIO_CLASS_RT, value=7) 371 except psutil.AccessDenied: 372 pass 373 # errs 374 self.assertRaisesRegex( 375 ValueError, "ioclass accepts no value", 376 p.ionice, psutil.IOPRIO_CLASS_NONE, 1) 377 self.assertRaisesRegex( 378 ValueError, "ioclass accepts no value", 379 p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) 380 self.assertRaisesRegex( 381 ValueError, "'ioclass' argument must be specified", 382 p.ionice, value=1) 383 finally: 384 ioclass, value = init 385 if ioclass == psutil.IOPRIO_CLASS_NONE: 386 value = 0 387 p.ionice(ioclass, value) 388 389 @unittest.skipIf(not HAS_IONICE, "not supported") 390 @unittest.skipIf(not WINDOWS, 'not supported on this win version') 391 def test_ionice_win(self): 392 p = psutil.Process() 393 if not CI_TESTING: 394 self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL) 395 init = p.ionice() 396 try: 397 # base 398 p.ionice(psutil.IOPRIO_VERYLOW) 399 self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW) 400 p.ionice(psutil.IOPRIO_LOW) 401 self.assertEqual(p.ionice(), psutil.IOPRIO_LOW) 402 try: 403 p.ionice(psutil.IOPRIO_HIGH) 404 except psutil.AccessDenied: 405 pass 406 else: 407 self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH) 408 # errs 409 self.assertRaisesRegex( 410 TypeError, "value argument not accepted on Windows", 411 p.ionice, psutil.IOPRIO_NORMAL, value=1) 412 self.assertRaisesRegex( 413 ValueError, "is not a valid priority", 414 p.ionice, psutil.IOPRIO_HIGH + 1) 415 finally: 416 p.ionice(init) 417 418 @unittest.skipIf(not HAS_RLIMIT, "not supported") 419 def test_rlimit_get(self): 420 import resource 421 p = psutil.Process(os.getpid()) 422 names = [x for x in dir(psutil) if x.startswith('RLIMIT')] 423 assert names, names 424 for name in names: 425 value = getattr(psutil, name) 426 self.assertGreaterEqual(value, 0) 427 if name in dir(resource): 428 self.assertEqual(value, getattr(resource, name)) 429 # XXX - On PyPy RLIMIT_INFINITY returned by 430 # resource.getrlimit() is reported as a very big long 431 # number instead of -1. It looks like a bug with PyPy. 432 if PYPY: 433 continue 434 self.assertEqual(p.rlimit(value), resource.getrlimit(value)) 435 else: 436 ret = p.rlimit(value) 437 self.assertEqual(len(ret), 2) 438 self.assertGreaterEqual(ret[0], -1) 439 self.assertGreaterEqual(ret[1], -1) 440 441 @unittest.skipIf(not HAS_RLIMIT, "not supported") 442 def test_rlimit_set(self): 443 p = self.spawn_psproc() 444 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) 445 self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) 446 # If pid is 0 prlimit() applies to the calling process and 447 # we don't want that. 448 if LINUX: 449 with self.assertRaisesRegex(ValueError, "can't use prlimit"): 450 psutil._psplatform.Process(0).rlimit(0) 451 with self.assertRaises(ValueError): 452 p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) 453 454 @unittest.skipIf(not HAS_RLIMIT, "not supported") 455 def test_rlimit(self): 456 p = psutil.Process() 457 testfn = self.get_testfn() 458 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) 459 try: 460 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) 461 with open(testfn, "wb") as f: 462 f.write(b"X" * 1024) 463 # write() or flush() doesn't always cause the exception 464 # but close() will. 465 with self.assertRaises(IOError) as exc: 466 with open(testfn, "wb") as f: 467 f.write(b"X" * 1025) 468 self.assertEqual(exc.exception.errno if PY3 else exc.exception[0], 469 errno.EFBIG) 470 finally: 471 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) 472 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) 473 474 @unittest.skipIf(not HAS_RLIMIT, "not supported") 475 def test_rlimit_infinity(self): 476 # First set a limit, then re-set it by specifying INFINITY 477 # and assume we overridden the previous limit. 478 p = psutil.Process() 479 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) 480 try: 481 p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard)) 482 p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard)) 483 with open(self.get_testfn(), "wb") as f: 484 f.write(b"X" * 2048) 485 finally: 486 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) 487 self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard)) 488 489 @unittest.skipIf(not HAS_RLIMIT, "not supported") 490 def test_rlimit_infinity_value(self): 491 # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really 492 # big number on a platform with large file support. On these 493 # platforms we need to test that the get/setrlimit functions 494 # properly convert the number to a C long long and that the 495 # conversion doesn't raise an error. 496 p = psutil.Process() 497 soft, hard = p.rlimit(psutil.RLIMIT_FSIZE) 498 self.assertEqual(psutil.RLIM_INFINITY, hard) 499 p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard)) 500 501 def test_num_threads(self): 502 # on certain platforms such as Linux we might test for exact 503 # thread number, since we always have with 1 thread per process, 504 # but this does not apply across all platforms (MACOS, Windows) 505 p = psutil.Process() 506 if OPENBSD: 507 try: 508 step1 = p.num_threads() 509 except psutil.AccessDenied: 510 raise unittest.SkipTest("on OpenBSD this requires root access") 511 else: 512 step1 = p.num_threads() 513 514 with ThreadTask(): 515 step2 = p.num_threads() 516 self.assertEqual(step2, step1 + 1) 517 518 @unittest.skipIf(not WINDOWS, 'WINDOWS only') 519 def test_num_handles(self): 520 # a better test is done later into test/_windows.py 521 p = psutil.Process() 522 self.assertGreater(p.num_handles(), 0) 523 524 @unittest.skipIf(not HAS_THREADS, 'not supported') 525 def test_threads(self): 526 p = psutil.Process() 527 if OPENBSD: 528 try: 529 step1 = p.threads() 530 except psutil.AccessDenied: 531 raise unittest.SkipTest("on OpenBSD this requires root access") 532 else: 533 step1 = p.threads() 534 535 with ThreadTask(): 536 step2 = p.threads() 537 self.assertEqual(len(step2), len(step1) + 1) 538 athread = step2[0] 539 # test named tuple 540 self.assertEqual(athread.id, athread[0]) 541 self.assertEqual(athread.user_time, athread[1]) 542 self.assertEqual(athread.system_time, athread[2]) 543 544 @retry_on_failure() 545 @skip_on_access_denied(only_if=MACOS) 546 @unittest.skipIf(not HAS_THREADS, 'not supported') 547 def test_threads_2(self): 548 p = self.spawn_psproc() 549 if OPENBSD: 550 try: 551 p.threads() 552 except psutil.AccessDenied: 553 raise unittest.SkipTest( 554 "on OpenBSD this requires root access") 555 self.assertAlmostEqual( 556 p.cpu_times().user, 557 sum([x.user_time for x in p.threads()]), delta=0.1) 558 self.assertAlmostEqual( 559 p.cpu_times().system, 560 sum([x.system_time for x in p.threads()]), delta=0.1) 561 562 @retry_on_failure() 563 def test_memory_info(self): 564 p = psutil.Process() 565 566 # step 1 - get a base value to compare our results 567 rss1, vms1 = p.memory_info()[:2] 568 percent1 = p.memory_percent() 569 self.assertGreater(rss1, 0) 570 self.assertGreater(vms1, 0) 571 572 # step 2 - allocate some memory 573 memarr = [None] * 1500000 574 575 rss2, vms2 = p.memory_info()[:2] 576 percent2 = p.memory_percent() 577 578 # step 3 - make sure that the memory usage bumped up 579 self.assertGreater(rss2, rss1) 580 self.assertGreaterEqual(vms2, vms1) # vms might be equal 581 self.assertGreater(percent2, percent1) 582 del memarr 583 584 if WINDOWS: 585 mem = p.memory_info() 586 self.assertEqual(mem.rss, mem.wset) 587 self.assertEqual(mem.vms, mem.pagefile) 588 589 mem = p.memory_info() 590 for name in mem._fields: 591 self.assertGreaterEqual(getattr(mem, name), 0) 592 593 def test_memory_full_info(self): 594 p = psutil.Process() 595 total = psutil.virtual_memory().total 596 mem = p.memory_full_info() 597 for name in mem._fields: 598 value = getattr(mem, name) 599 self.assertGreaterEqual(value, 0, msg=(name, value)) 600 if name == 'vms' and OSX or LINUX: 601 continue 602 self.assertLessEqual(value, total, msg=(name, value, total)) 603 if LINUX or WINDOWS or MACOS: 604 self.assertGreaterEqual(mem.uss, 0) 605 if LINUX: 606 self.assertGreaterEqual(mem.pss, 0) 607 self.assertGreaterEqual(mem.swap, 0) 608 609 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") 610 def test_memory_maps(self): 611 p = psutil.Process() 612 maps = p.memory_maps() 613 paths = [x for x in maps] 614 self.assertEqual(len(paths), len(set(paths))) 615 ext_maps = p.memory_maps(grouped=False) 616 617 for nt in maps: 618 if not nt.path.startswith('['): 619 assert os.path.isabs(nt.path), nt.path 620 if POSIX: 621 try: 622 assert os.path.exists(nt.path) or \ 623 os.path.islink(nt.path), nt.path 624 except AssertionError: 625 if not LINUX: 626 raise 627 else: 628 # https://github.com/giampaolo/psutil/issues/759 629 with open_text('/proc/self/smaps') as f: 630 data = f.read() 631 if "%s (deleted)" % nt.path not in data: 632 raise 633 else: 634 # XXX - On Windows we have this strange behavior with 635 # 64 bit dlls: they are visible via explorer but cannot 636 # be accessed via os.stat() (wtf?). 637 if '64' not in os.path.basename(nt.path): 638 try: 639 st = os.stat(nt.path) 640 except FileNotFoundError: 641 pass 642 else: 643 assert stat.S_ISREG(st.st_mode), nt.path 644 for nt in ext_maps: 645 for fname in nt._fields: 646 value = getattr(nt, fname) 647 if fname == 'path': 648 continue 649 elif fname in ('addr', 'perms'): 650 assert value, value 651 else: 652 self.assertIsInstance(value, (int, long)) 653 assert value >= 0, value 654 655 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") 656 def test_memory_maps_lists_lib(self): 657 # Make sure a newly loaded shared lib is listed. 658 p = psutil.Process() 659 with copyload_shared_lib() as path: 660 def normpath(p): 661 return os.path.realpath(os.path.normcase(p)) 662 libpaths = [normpath(x.path) 663 for x in p.memory_maps()] 664 self.assertIn(normpath(path), libpaths) 665 666 def test_memory_percent(self): 667 p = psutil.Process() 668 p.memory_percent() 669 self.assertRaises(ValueError, p.memory_percent, memtype="?!?") 670 if LINUX or MACOS or WINDOWS: 671 p.memory_percent(memtype='uss') 672 673 def test_is_running(self): 674 p = self.spawn_psproc() 675 assert p.is_running() 676 assert p.is_running() 677 p.kill() 678 p.wait() 679 assert not p.is_running() 680 assert not p.is_running() 681 682 def test_exe(self): 683 p = self.spawn_psproc() 684 exe = p.exe() 685 try: 686 self.assertEqual(exe, PYTHON_EXE) 687 except AssertionError: 688 if WINDOWS and len(exe) == len(PYTHON_EXE): 689 # on Windows we don't care about case sensitivity 690 normcase = os.path.normcase 691 self.assertEqual(normcase(exe), normcase(PYTHON_EXE)) 692 else: 693 # certain platforms such as BSD are more accurate returning: 694 # "/usr/local/bin/python2.7" 695 # ...instead of: 696 # "/usr/local/bin/python" 697 # We do not want to consider this difference in accuracy 698 # an error. 699 ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) 700 try: 701 self.assertEqual(exe.replace(ver, ''), 702 PYTHON_EXE.replace(ver, '')) 703 except AssertionError: 704 # Tipically MACOS. Really not sure what to do here. 705 pass 706 707 out = sh([exe, "-c", "import os; print('hey')"]) 708 self.assertEqual(out, 'hey') 709 710 def test_cmdline(self): 711 cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"] 712 p = self.spawn_psproc(cmdline) 713 # XXX - most of the times the underlying sysctl() call on Net 714 # and Open BSD returns a truncated string. 715 # Also /proc/pid/cmdline behaves the same so it looks 716 # like this is a kernel bug. 717 # XXX - AIX truncates long arguments in /proc/pid/cmdline 718 if NETBSD or OPENBSD or AIX: 719 self.assertEqual(p.cmdline()[0], PYTHON_EXE) 720 else: 721 self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline)) 722 723 @unittest.skipIf(PYPY, "broken on PYPY") 724 def test_long_cmdline(self): 725 testfn = self.get_testfn() 726 create_exe(testfn) 727 cmdline = [testfn] + (["0123456789"] * 20) 728 p = self.spawn_psproc(cmdline) 729 self.assertEqual(p.cmdline(), cmdline) 730 731 def test_name(self): 732 p = self.spawn_psproc(PYTHON_EXE) 733 name = p.name().lower() 734 pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() 735 assert pyexe.startswith(name), (pyexe, name) 736 737 @unittest.skipIf(PYPY, "unreliable on PYPY") 738 def test_long_name(self): 739 testfn = self.get_testfn(suffix="0123456789" * 2) 740 create_exe(testfn) 741 p = self.spawn_psproc(testfn) 742 self.assertEqual(p.name(), os.path.basename(testfn)) 743 744 # XXX 745 @unittest.skipIf(SUNOS, "broken on SUNOS") 746 @unittest.skipIf(AIX, "broken on AIX") 747 @unittest.skipIf(PYPY, "broken on PYPY") 748 def test_prog_w_funky_name(self): 749 # Test that name(), exe() and cmdline() correctly handle programs 750 # with funky chars such as spaces and ")", see: 751 # https://github.com/giampaolo/psutil/issues/628 752 funky_path = self.get_testfn(suffix='foo bar )') 753 create_exe(funky_path) 754 cmdline = [funky_path, "-c", 755 "import time; [time.sleep(0.01) for x in range(3000)];" 756 "arg1", "arg2", "", "arg3", ""] 757 p = self.spawn_psproc(cmdline) 758 self.assertEqual(p.cmdline(), cmdline) 759 self.assertEqual(p.name(), os.path.basename(funky_path)) 760 self.assertEqual(os.path.normcase(p.exe()), 761 os.path.normcase(funky_path)) 762 763 @unittest.skipIf(not POSIX, 'POSIX only') 764 def test_uids(self): 765 p = psutil.Process() 766 real, effective, saved = p.uids() 767 # os.getuid() refers to "real" uid 768 self.assertEqual(real, os.getuid()) 769 # os.geteuid() refers to "effective" uid 770 self.assertEqual(effective, os.geteuid()) 771 # No such thing as os.getsuid() ("saved" uid), but starting 772 # from python 2.7 we have os.getresuid() which returns all 773 # of them. 774 if hasattr(os, "getresuid"): 775 self.assertEqual(os.getresuid(), p.uids()) 776 777 @unittest.skipIf(not POSIX, 'POSIX only') 778 def test_gids(self): 779 p = psutil.Process() 780 real, effective, saved = p.gids() 781 # os.getuid() refers to "real" uid 782 self.assertEqual(real, os.getgid()) 783 # os.geteuid() refers to "effective" uid 784 self.assertEqual(effective, os.getegid()) 785 # No such thing as os.getsgid() ("saved" gid), but starting 786 # from python 2.7 we have os.getresgid() which returns all 787 # of them. 788 if hasattr(os, "getresuid"): 789 self.assertEqual(os.getresgid(), p.gids()) 790 791 def test_nice(self): 792 p = psutil.Process() 793 self.assertRaises(TypeError, p.nice, "str") 794 init = p.nice() 795 try: 796 if WINDOWS: 797 for prio in [psutil.NORMAL_PRIORITY_CLASS, 798 psutil.IDLE_PRIORITY_CLASS, 799 psutil.BELOW_NORMAL_PRIORITY_CLASS, 800 psutil.REALTIME_PRIORITY_CLASS, 801 psutil.HIGH_PRIORITY_CLASS, 802 psutil.ABOVE_NORMAL_PRIORITY_CLASS]: 803 with self.subTest(prio=prio): 804 try: 805 p.nice(prio) 806 except psutil.AccessDenied: 807 pass 808 else: 809 self.assertEqual(p.nice(), prio) 810 else: 811 try: 812 if hasattr(os, "getpriority"): 813 self.assertEqual( 814 os.getpriority(os.PRIO_PROCESS, os.getpid()), 815 p.nice()) 816 p.nice(1) 817 self.assertEqual(p.nice(), 1) 818 if hasattr(os, "getpriority"): 819 self.assertEqual( 820 os.getpriority(os.PRIO_PROCESS, os.getpid()), 821 p.nice()) 822 # XXX - going back to previous nice value raises 823 # AccessDenied on MACOS 824 if not MACOS: 825 p.nice(0) 826 self.assertEqual(p.nice(), 0) 827 except psutil.AccessDenied: 828 pass 829 finally: 830 try: 831 p.nice(init) 832 except psutil.AccessDenied: 833 pass 834 835 def test_status(self): 836 p = psutil.Process() 837 self.assertEqual(p.status(), psutil.STATUS_RUNNING) 838 839 def test_username(self): 840 p = self.spawn_psproc() 841 username = p.username() 842 if WINDOWS: 843 domain, username = username.split('\\') 844 self.assertEqual(username, getpass.getuser()) 845 if 'USERDOMAIN' in os.environ: 846 self.assertEqual(domain, os.environ['USERDOMAIN']) 847 else: 848 self.assertEqual(username, getpass.getuser()) 849 850 def test_cwd(self): 851 p = self.spawn_psproc() 852 self.assertEqual(p.cwd(), os.getcwd()) 853 854 def test_cwd_2(self): 855 cmd = [PYTHON_EXE, "-c", 856 "import os, time; os.chdir('..'); time.sleep(60)"] 857 p = self.spawn_psproc(cmd) 858 call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") 859 860 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') 861 def test_cpu_affinity(self): 862 p = psutil.Process() 863 initial = p.cpu_affinity() 864 assert initial, initial 865 self.addCleanup(p.cpu_affinity, initial) 866 867 if hasattr(os, "sched_getaffinity"): 868 self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) 869 self.assertEqual(len(initial), len(set(initial))) 870 871 all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) 872 for n in all_cpus: 873 p.cpu_affinity([n]) 874 self.assertEqual(p.cpu_affinity(), [n]) 875 if hasattr(os, "sched_getaffinity"): 876 self.assertEqual(p.cpu_affinity(), 877 list(os.sched_getaffinity(p.pid))) 878 # also test num_cpu() 879 if hasattr(p, "num_cpu"): 880 self.assertEqual(p.cpu_affinity()[0], p.num_cpu()) 881 882 # [] is an alias for "all eligible CPUs"; on Linux this may 883 # not be equal to all available CPUs, see: 884 # https://github.com/giampaolo/psutil/issues/956 885 p.cpu_affinity([]) 886 if LINUX: 887 self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus()) 888 else: 889 self.assertEqual(p.cpu_affinity(), all_cpus) 890 if hasattr(os, "sched_getaffinity"): 891 self.assertEqual(p.cpu_affinity(), 892 list(os.sched_getaffinity(p.pid))) 893 # 894 self.assertRaises(TypeError, p.cpu_affinity, 1) 895 p.cpu_affinity(initial) 896 # it should work with all iterables, not only lists 897 p.cpu_affinity(set(all_cpus)) 898 p.cpu_affinity(tuple(all_cpus)) 899 900 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') 901 def test_cpu_affinity_errs(self): 902 p = self.spawn_psproc() 903 invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] 904 self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) 905 self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) 906 self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) 907 self.assertRaises(ValueError, p.cpu_affinity, [0, -1]) 908 909 @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported') 910 def test_cpu_affinity_all_combinations(self): 911 p = psutil.Process() 912 initial = p.cpu_affinity() 913 assert initial, initial 914 self.addCleanup(p.cpu_affinity, initial) 915 916 # All possible CPU set combinations. 917 if len(initial) > 12: 918 initial = initial[:12] # ...otherwise it will take forever 919 combos = [] 920 for i in range(0, len(initial) + 1): 921 for subset in itertools.combinations(initial, i): 922 if subset: 923 combos.append(list(subset)) 924 925 for combo in combos: 926 p.cpu_affinity(combo) 927 self.assertEqual(sorted(p.cpu_affinity()), sorted(combo)) 928 929 # TODO: #595 930 @unittest.skipIf(BSD, "broken on BSD") 931 # can't find any process file on Appveyor 932 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") 933 def test_open_files(self): 934 p = psutil.Process() 935 testfn = self.get_testfn() 936 files = p.open_files() 937 self.assertNotIn(testfn, files) 938 with open(testfn, 'wb') as f: 939 f.write(b'x' * 1024) 940 f.flush() 941 # give the kernel some time to see the new file 942 files = call_until(p.open_files, "len(ret) != %i" % len(files)) 943 filenames = [os.path.normcase(x.path) for x in files] 944 self.assertIn(os.path.normcase(testfn), filenames) 945 if LINUX: 946 for file in files: 947 if file.path == testfn: 948 self.assertEqual(file.position, 1024) 949 for file in files: 950 assert os.path.isfile(file.path), file 951 952 # another process 953 cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn 954 p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline]) 955 956 for x in range(100): 957 filenames = [os.path.normcase(x.path) for x in p.open_files()] 958 if testfn in filenames: 959 break 960 time.sleep(.01) 961 else: 962 self.assertIn(os.path.normcase(testfn), filenames) 963 for file in filenames: 964 assert os.path.isfile(file), file 965 966 # TODO: #595 967 @unittest.skipIf(BSD, "broken on BSD") 968 # can't find any process file on Appveyor 969 @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR") 970 def test_open_files_2(self): 971 # test fd and path fields 972 p = psutil.Process() 973 normcase = os.path.normcase 974 testfn = self.get_testfn() 975 with open(testfn, 'w') as fileobj: 976 for file in p.open_files(): 977 if normcase(file.path) == normcase(fileobj.name) or \ 978 file.fd == fileobj.fileno(): 979 break 980 else: 981 self.fail("no file found; files=%s" % repr(p.open_files())) 982 self.assertEqual(normcase(file.path), normcase(fileobj.name)) 983 if WINDOWS: 984 self.assertEqual(file.fd, -1) 985 else: 986 self.assertEqual(file.fd, fileobj.fileno()) 987 # test positions 988 ntuple = p.open_files()[0] 989 self.assertEqual(ntuple[0], ntuple.path) 990 self.assertEqual(ntuple[1], ntuple.fd) 991 # test file is gone 992 self.assertNotIn(fileobj.name, p.open_files()) 993 994 @unittest.skipIf(not POSIX, 'POSIX only') 995 def test_num_fds(self): 996 p = psutil.Process() 997 testfn = self.get_testfn() 998 start = p.num_fds() 999 file = open(testfn, 'w') 1000 self.addCleanup(file.close) 1001 self.assertEqual(p.num_fds(), start + 1) 1002 sock = socket.socket() 1003 self.addCleanup(sock.close) 1004 self.assertEqual(p.num_fds(), start + 2) 1005 file.close() 1006 sock.close() 1007 self.assertEqual(p.num_fds(), start) 1008 1009 @skip_on_not_implemented(only_if=LINUX) 1010 @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD") 1011 def test_num_ctx_switches(self): 1012 p = psutil.Process() 1013 before = sum(p.num_ctx_switches()) 1014 for x in range(500000): 1015 after = sum(p.num_ctx_switches()) 1016 if after > before: 1017 return 1018 self.fail("num ctx switches still the same after 50.000 iterations") 1019 1020 def test_ppid(self): 1021 p = psutil.Process() 1022 if hasattr(os, 'getppid'): 1023 self.assertEqual(p.ppid(), os.getppid()) 1024 p = self.spawn_psproc() 1025 self.assertEqual(p.ppid(), os.getpid()) 1026 if APPVEYOR: 1027 # Occasional failures, see: 1028 # https://ci.appveyor.com/project/giampaolo/psutil/build/ 1029 # job/0hs623nenj7w4m33 1030 return 1031 1032 def test_parent(self): 1033 p = self.spawn_psproc() 1034 self.assertEqual(p.parent().pid, os.getpid()) 1035 1036 lowest_pid = psutil.pids()[0] 1037 self.assertIsNone(psutil.Process(lowest_pid).parent()) 1038 1039 def test_parent_multi(self): 1040 parent = psutil.Process() 1041 child, grandchild = self.spawn_children_pair() 1042 self.assertEqual(grandchild.parent(), child) 1043 self.assertEqual(child.parent(), parent) 1044 1045 def test_parent_disappeared(self): 1046 # Emulate a case where the parent process disappeared. 1047 p = self.spawn_psproc() 1048 with mock.patch("psutil.Process", 1049 side_effect=psutil.NoSuchProcess(0, 'foo')): 1050 self.assertIsNone(p.parent()) 1051 1052 @retry_on_failure() 1053 def test_parents(self): 1054 parent = psutil.Process() 1055 assert parent.parents() 1056 child, grandchild = self.spawn_children_pair() 1057 self.assertEqual(child.parents()[0], parent) 1058 self.assertEqual(grandchild.parents()[0], child) 1059 self.assertEqual(grandchild.parents()[1], parent) 1060 1061 def test_children(self): 1062 parent = psutil.Process() 1063 self.assertEqual(parent.children(), []) 1064 self.assertEqual(parent.children(recursive=True), []) 1065 # On Windows we set the flag to 0 in order to cancel out the 1066 # CREATE_NO_WINDOW flag (enabled by default) which creates 1067 # an extra "conhost.exe" child. 1068 child = self.spawn_psproc(creationflags=0) 1069 children1 = parent.children() 1070 children2 = parent.children(recursive=True) 1071 for children in (children1, children2): 1072 self.assertEqual(len(children), 1) 1073 self.assertEqual(children[0].pid, child.pid) 1074 self.assertEqual(children[0].ppid(), parent.pid) 1075 1076 def test_children_recursive(self): 1077 # Test children() against two sub processes, p1 and p2, where 1078 # p1 (our child) spawned p2 (our grandchild). 1079 parent = psutil.Process() 1080 child, grandchild = self.spawn_children_pair() 1081 self.assertEqual(parent.children(), [child]) 1082 self.assertEqual(parent.children(recursive=True), [child, grandchild]) 1083 # If the intermediate process is gone there's no way for 1084 # children() to recursively find it. 1085 child.terminate() 1086 child.wait() 1087 self.assertEqual(parent.children(recursive=True), []) 1088 1089 def test_children_duplicates(self): 1090 # find the process which has the highest number of children 1091 table = collections.defaultdict(int) 1092 for p in psutil.process_iter(): 1093 try: 1094 table[p.ppid()] += 1 1095 except psutil.Error: 1096 pass 1097 # this is the one, now let's make sure there are no duplicates 1098 pid = sorted(table.items(), key=lambda x: x[1])[-1][0] 1099 if LINUX and pid == 0: 1100 raise self.skipTest("PID 0") 1101 p = psutil.Process(pid) 1102 try: 1103 c = p.children(recursive=True) 1104 except psutil.AccessDenied: # windows 1105 pass 1106 else: 1107 self.assertEqual(len(c), len(set(c))) 1108 1109 def test_parents_and_children(self): 1110 parent = psutil.Process() 1111 child, grandchild = self.spawn_children_pair() 1112 # forward 1113 children = parent.children(recursive=True) 1114 self.assertEqual(len(children), 2) 1115 self.assertEqual(children[0], child) 1116 self.assertEqual(children[1], grandchild) 1117 # backward 1118 parents = grandchild.parents() 1119 self.assertEqual(parents[0], child) 1120 self.assertEqual(parents[1], parent) 1121 1122 def test_suspend_resume(self): 1123 p = self.spawn_psproc() 1124 p.suspend() 1125 for x in range(100): 1126 if p.status() == psutil.STATUS_STOPPED: 1127 break 1128 time.sleep(0.01) 1129 p.resume() 1130 self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) 1131 1132 def test_invalid_pid(self): 1133 self.assertRaises(TypeError, psutil.Process, "1") 1134 self.assertRaises(ValueError, psutil.Process, -1) 1135 1136 def test_as_dict(self): 1137 p = psutil.Process() 1138 d = p.as_dict(attrs=['exe', 'name']) 1139 self.assertEqual(sorted(d.keys()), ['exe', 'name']) 1140 1141 p = psutil.Process(min(psutil.pids())) 1142 d = p.as_dict(attrs=['connections'], ad_value='foo') 1143 if not isinstance(d['connections'], list): 1144 self.assertEqual(d['connections'], 'foo') 1145 1146 # Test ad_value is set on AccessDenied. 1147 with mock.patch('psutil.Process.nice', create=True, 1148 side_effect=psutil.AccessDenied): 1149 self.assertEqual( 1150 p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1}) 1151 1152 # Test that NoSuchProcess bubbles up. 1153 with mock.patch('psutil.Process.nice', create=True, 1154 side_effect=psutil.NoSuchProcess(p.pid, "name")): 1155 self.assertRaises( 1156 psutil.NoSuchProcess, p.as_dict, attrs=["nice"]) 1157 1158 # Test that ZombieProcess is swallowed. 1159 with mock.patch('psutil.Process.nice', create=True, 1160 side_effect=psutil.ZombieProcess(p.pid, "name")): 1161 self.assertEqual( 1162 p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"}) 1163 1164 # By default APIs raising NotImplementedError are 1165 # supposed to be skipped. 1166 with mock.patch('psutil.Process.nice', create=True, 1167 side_effect=NotImplementedError): 1168 d = p.as_dict() 1169 self.assertNotIn('nice', list(d.keys())) 1170 # ...unless the user explicitly asked for some attr. 1171 with self.assertRaises(NotImplementedError): 1172 p.as_dict(attrs=["nice"]) 1173 1174 # errors 1175 with self.assertRaises(TypeError): 1176 p.as_dict('name') 1177 with self.assertRaises(ValueError): 1178 p.as_dict(['foo']) 1179 with self.assertRaises(ValueError): 1180 p.as_dict(['foo', 'bar']) 1181 1182 def test_oneshot(self): 1183 p = psutil.Process() 1184 with mock.patch("psutil._psplatform.Process.cpu_times") as m: 1185 with p.oneshot(): 1186 p.cpu_times() 1187 p.cpu_times() 1188 self.assertEqual(m.call_count, 1) 1189 1190 with mock.patch("psutil._psplatform.Process.cpu_times") as m: 1191 p.cpu_times() 1192 p.cpu_times() 1193 self.assertEqual(m.call_count, 2) 1194 1195 def test_oneshot_twice(self): 1196 # Test the case where the ctx manager is __enter__ed twice. 1197 # The second __enter__ is supposed to resut in a NOOP. 1198 p = psutil.Process() 1199 with mock.patch("psutil._psplatform.Process.cpu_times") as m1: 1200 with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2: 1201 with p.oneshot(): 1202 p.cpu_times() 1203 p.cpu_times() 1204 with p.oneshot(): 1205 p.cpu_times() 1206 p.cpu_times() 1207 self.assertEqual(m1.call_count, 1) 1208 self.assertEqual(m2.call_count, 1) 1209 1210 with mock.patch("psutil._psplatform.Process.cpu_times") as m: 1211 p.cpu_times() 1212 p.cpu_times() 1213 self.assertEqual(m.call_count, 2) 1214 1215 def test_oneshot_cache(self): 1216 # Make sure oneshot() cache is nonglobal. Instead it's 1217 # supposed to be bound to the Process instance, see: 1218 # https://github.com/giampaolo/psutil/issues/1373 1219 p1, p2 = self.spawn_children_pair() 1220 p1_ppid = p1.ppid() 1221 p2_ppid = p2.ppid() 1222 self.assertNotEqual(p1_ppid, p2_ppid) 1223 with p1.oneshot(): 1224 self.assertEqual(p1.ppid(), p1_ppid) 1225 self.assertEqual(p2.ppid(), p2_ppid) 1226 with p2.oneshot(): 1227 self.assertEqual(p1.ppid(), p1_ppid) 1228 self.assertEqual(p2.ppid(), p2_ppid) 1229 1230 def test_halfway_terminated_process(self): 1231 # Test that NoSuchProcess exception gets raised in case the 1232 # process dies after we create the Process object. 1233 # Example: 1234 # >>> proc = Process(1234) 1235 # >>> time.sleep(2) # time-consuming task, process dies in meantime 1236 # >>> proc.name() 1237 # Refers to Issue #15 1238 def assert_raises_nsp(fun, fun_name): 1239 try: 1240 ret = fun() 1241 except psutil.ZombieProcess: # differentiate from NSP 1242 raise 1243 except psutil.NoSuchProcess: 1244 pass 1245 except psutil.AccessDenied: 1246 if OPENBSD and fun_name in ('threads', 'num_threads'): 1247 return 1248 raise 1249 else: 1250 # NtQuerySystemInformation succeeds even if process is gone. 1251 if WINDOWS and fun_name in ('exe', 'name'): 1252 return 1253 raise self.fail("%r didn't raise NSP and returned %r " 1254 "instead" % (fun, ret)) 1255 1256 p = self.spawn_psproc() 1257 p.terminate() 1258 p.wait() 1259 if WINDOWS: # XXX 1260 call_until(psutil.pids, "%s not in ret" % p.pid) 1261 self.assertProcessGone(p) 1262 1263 ns = process_namespace(p) 1264 for fun, name in ns.iter(ns.all): 1265 assert_raises_nsp(fun, name) 1266 1267 # NtQuerySystemInformation succeeds even if process is gone. 1268 if WINDOWS and not GITHUB_ACTIONS: 1269 normcase = os.path.normcase 1270 self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE)) 1271 1272 @unittest.skipIf(not POSIX, 'POSIX only') 1273 def test_zombie_process(self): 1274 def succeed_or_zombie_p_exc(fun): 1275 try: 1276 return fun() 1277 except (psutil.ZombieProcess, psutil.AccessDenied): 1278 pass 1279 1280 parent, zombie = self.spawn_zombie() 1281 # A zombie process should always be instantiable 1282 zproc = psutil.Process(zombie.pid) 1283 # ...and at least its status always be querable 1284 self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) 1285 # ...and it should be considered 'running' 1286 assert zproc.is_running() 1287 # ...and as_dict() shouldn't crash 1288 zproc.as_dict() 1289 # ...its parent should 'see' it (edit: not true on BSD and MACOS 1290 # descendants = [x.pid for x in psutil.Process().children( 1291 # recursive=True)] 1292 # self.assertIn(zpid, descendants) 1293 # XXX should we also assume ppid be usable? Note: this 1294 # would be an important use case as the only way to get 1295 # rid of a zombie is to kill its parent. 1296 # self.assertEqual(zpid.ppid(), os.getpid()) 1297 # ...and all other APIs should be able to deal with it 1298 1299 ns = process_namespace(zproc) 1300 for fun, name in ns.iter(ns.all): 1301 succeed_or_zombie_p_exc(fun) 1302 1303 assert psutil.pid_exists(zproc.pid) 1304 self.assertIn(zproc.pid, psutil.pids()) 1305 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()]) 1306 psutil._pmap = {} 1307 self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()]) 1308 1309 @unittest.skipIf(not POSIX, 'POSIX only') 1310 def test_zombie_process_is_running_w_exc(self): 1311 # Emulate a case where internally is_running() raises 1312 # ZombieProcess. 1313 p = psutil.Process() 1314 with mock.patch("psutil.Process", 1315 side_effect=psutil.ZombieProcess(0)) as m: 1316 assert p.is_running() 1317 assert m.called 1318 1319 @unittest.skipIf(not POSIX, 'POSIX only') 1320 def test_zombie_process_status_w_exc(self): 1321 # Emulate a case where internally status() raises 1322 # ZombieProcess. 1323 p = psutil.Process() 1324 with mock.patch("psutil._psplatform.Process.status", 1325 side_effect=psutil.ZombieProcess(0)) as m: 1326 self.assertEqual(p.status(), psutil.STATUS_ZOMBIE) 1327 assert m.called 1328 1329 def test_pid_0(self): 1330 # Process(0) is supposed to work on all platforms except Linux 1331 if 0 not in psutil.pids(): 1332 self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) 1333 # These 2 are a contradiction, but "ps" says PID 1's parent 1334 # is PID 0. 1335 assert not psutil.pid_exists(0) 1336 self.assertEqual(psutil.Process(1).ppid(), 0) 1337 return 1338 1339 p = psutil.Process(0) 1340 exc = psutil.AccessDenied if WINDOWS else ValueError 1341 self.assertRaises(exc, p.wait) 1342 self.assertRaises(exc, p.terminate) 1343 self.assertRaises(exc, p.suspend) 1344 self.assertRaises(exc, p.resume) 1345 self.assertRaises(exc, p.kill) 1346 self.assertRaises(exc, p.send_signal, signal.SIGTERM) 1347 1348 # test all methods 1349 ns = process_namespace(p) 1350 for fun, name in ns.iter(ns.getters + ns.setters): 1351 try: 1352 ret = fun() 1353 except psutil.AccessDenied: 1354 pass 1355 else: 1356 if name in ("uids", "gids"): 1357 self.assertEqual(ret.real, 0) 1358 elif name == "username": 1359 user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root' 1360 self.assertEqual(p.username(), user) 1361 elif name == "name": 1362 assert name, name 1363 1364 if not OPENBSD: 1365 self.assertIn(0, psutil.pids()) 1366 assert psutil.pid_exists(0) 1367 1368 @unittest.skipIf(not HAS_ENVIRON, "not supported") 1369 def test_environ(self): 1370 def clean_dict(d): 1371 # Most of these are problematic on Travis. 1372 d.pop("PSUTIL_TESTING", None) 1373 d.pop("PLAT", None) 1374 d.pop("HOME", None) 1375 if MACOS: 1376 d.pop("__CF_USER_TEXT_ENCODING", None) 1377 d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None) 1378 d.pop("VERSIONER_PYTHON_VERSION", None) 1379 return dict( 1380 [(k.replace("\r", "").replace("\n", ""), 1381 v.replace("\r", "").replace("\n", "")) 1382 for k, v in d.items()]) 1383 1384 self.maxDiff = None 1385 p = psutil.Process() 1386 d1 = clean_dict(p.environ()) 1387 d2 = clean_dict(os.environ.copy()) 1388 if not OSX and GITHUB_ACTIONS: 1389 self.assertEqual(d1, d2) 1390 1391 @unittest.skipIf(not HAS_ENVIRON, "not supported") 1392 @unittest.skipIf(not POSIX, "POSIX only") 1393 def test_weird_environ(self): 1394 # environment variables can contain values without an equals sign 1395 code = textwrap.dedent(""" 1396 #include <unistd.h> 1397 #include <fcntl.h> 1398 char * const argv[] = {"cat", 0}; 1399 char * const envp[] = {"A=1", "X", "C=3", 0}; 1400 int main(void) { 1401 /* Close stderr on exec so parent can wait for the execve to 1402 * finish. */ 1403 if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0) 1404 return 0; 1405 return execve("/bin/cat", argv, envp); 1406 } 1407 """) 1408 path = self.get_testfn() 1409 create_exe(path, c_code=code) 1410 sproc = self.spawn_testproc( 1411 [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE) 1412 p = psutil.Process(sproc.pid) 1413 wait_for_pid(p.pid) 1414 assert p.is_running() 1415 # Wait for process to exec or exit. 1416 self.assertEqual(sproc.stderr.read(), b"") 1417 if MACOS and CI_TESTING: 1418 try: 1419 env = p.environ() 1420 except psutil.AccessDenied: 1421 # XXX: fails sometimes with: 1422 # PermissionError from 'sysctl(KERN_PROCARGS2) -> EIO' 1423 return 1424 else: 1425 env = p.environ() 1426 self.assertEqual(env, {"A": "1", "C": "3"}) 1427 sproc.communicate() 1428 self.assertEqual(sproc.returncode, 0) 1429 1430 1431# =================================================================== 1432# --- Limited user tests 1433# =================================================================== 1434 1435 1436if POSIX and os.getuid() == 0: 1437 1438 class LimitedUserTestCase(TestProcess): 1439 """Repeat the previous tests by using a limited user. 1440 Executed only on UNIX and only if the user who run the test script 1441 is root. 1442 """ 1443 # the uid/gid the test suite runs under 1444 if hasattr(os, 'getuid'): 1445 PROCESS_UID = os.getuid() 1446 PROCESS_GID = os.getgid() 1447 1448 def __init__(self, *args, **kwargs): 1449 super().__init__(*args, **kwargs) 1450 # re-define all existent test methods in order to 1451 # ignore AccessDenied exceptions 1452 for attr in [x for x in dir(self) if x.startswith('test')]: 1453 meth = getattr(self, attr) 1454 1455 def test_(self): 1456 try: 1457 meth() 1458 except psutil.AccessDenied: 1459 pass 1460 setattr(self, attr, types.MethodType(test_, self)) 1461 1462 def setUp(self): 1463 super().setUp() 1464 os.setegid(1000) 1465 os.seteuid(1000) 1466 1467 def tearDown(self): 1468 os.setegid(self.PROCESS_UID) 1469 os.seteuid(self.PROCESS_GID) 1470 super().tearDown() 1471 1472 def test_nice(self): 1473 try: 1474 psutil.Process().nice(-1) 1475 except psutil.AccessDenied: 1476 pass 1477 else: 1478 self.fail("exception not raised") 1479 1480 @unittest.skipIf(1, "causes problem as root") 1481 def test_zombie_process(self): 1482 pass 1483 1484 1485# =================================================================== 1486# --- psutil.Popen tests 1487# =================================================================== 1488 1489 1490class TestPopen(PsutilTestCase): 1491 """Tests for psutil.Popen class.""" 1492 1493 @classmethod 1494 def tearDownClass(cls): 1495 reap_children() 1496 1497 def test_misc(self): 1498 # XXX this test causes a ResourceWarning on Python 3 because 1499 # psutil.__subproc instance doesn't get propertly freed. 1500 # Not sure what to do though. 1501 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] 1502 with psutil.Popen(cmd, stdout=subprocess.PIPE, 1503 stderr=subprocess.PIPE) as proc: 1504 proc.name() 1505 proc.cpu_times() 1506 proc.stdin 1507 self.assertTrue(dir(proc)) 1508 self.assertRaises(AttributeError, getattr, proc, 'foo') 1509 proc.terminate() 1510 if POSIX: 1511 self.assertEqual(proc.wait(5), -signal.SIGTERM) 1512 else: 1513 self.assertEqual(proc.wait(5), signal.SIGTERM) 1514 1515 def test_ctx_manager(self): 1516 with psutil.Popen([PYTHON_EXE, "-V"], 1517 stdout=subprocess.PIPE, 1518 stderr=subprocess.PIPE, 1519 stdin=subprocess.PIPE) as proc: 1520 proc.communicate() 1521 assert proc.stdout.closed 1522 assert proc.stderr.closed 1523 assert proc.stdin.closed 1524 self.assertEqual(proc.returncode, 0) 1525 1526 def test_kill_terminate(self): 1527 # subprocess.Popen()'s terminate(), kill() and send_signal() do 1528 # not raise exception after the process is gone. psutil.Popen 1529 # diverges from that. 1530 cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"] 1531 with psutil.Popen(cmd, stdout=subprocess.PIPE, 1532 stderr=subprocess.PIPE) as proc: 1533 proc.terminate() 1534 proc.wait() 1535 self.assertRaises(psutil.NoSuchProcess, proc.terminate) 1536 self.assertRaises(psutil.NoSuchProcess, proc.kill) 1537 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, 1538 signal.SIGTERM) 1539 if WINDOWS and sys.version_info >= (2, 7): 1540 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, 1541 signal.CTRL_C_EVENT) 1542 self.assertRaises(psutil.NoSuchProcess, proc.send_signal, 1543 signal.CTRL_BREAK_EVENT) 1544 1545 1546if __name__ == '__main__': 1547 from psutil.tests.runner import run_from_name 1548 run_from_name(__file__) 1549