1#!/usr/bin/env python3 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7""" 8Tests for detecting function memory leaks (typically the ones 9implemented in C). It does so by calling a function many times and 10checking whether process memory usage keeps increasing between 11calls or over time. 12Note that this may produce false positives (especially on Windows 13for some reason). 14PyPy appears to be completely unstable for this framework, probably 15because of how its JIT handles memory, so tests are skipped. 16""" 17 18from __future__ import print_function 19import functools 20import os 21 22import psutil 23import psutil._common 24from psutil import LINUX 25from psutil import MACOS 26from psutil import OPENBSD 27from psutil import POSIX 28from psutil import SUNOS 29from psutil import WINDOWS 30from psutil._compat import ProcessLookupError 31from psutil._compat import super 32from psutil.tests import create_sockets 33from psutil.tests import get_testfn 34from psutil.tests import HAS_CPU_AFFINITY 35from psutil.tests import HAS_CPU_FREQ 36from psutil.tests import HAS_ENVIRON 37from psutil.tests import HAS_IONICE 38from psutil.tests import HAS_MEMORY_MAPS 39from psutil.tests import HAS_NET_IO_COUNTERS 40from psutil.tests import HAS_PROC_CPU_NUM 41from psutil.tests import HAS_PROC_IO_COUNTERS 42from psutil.tests import HAS_RLIMIT 43from psutil.tests import HAS_SENSORS_BATTERY 44from psutil.tests import HAS_SENSORS_FANS 45from psutil.tests import HAS_SENSORS_TEMPERATURES 46from psutil.tests import process_namespace 47from psutil.tests import skip_on_access_denied 48from psutil.tests import spawn_testproc 49from psutil.tests import system_namespace 50from psutil.tests import terminate 51from psutil.tests import TestMemoryLeak 52from psutil.tests import unittest 53 54 55cext = psutil._psplatform.cext 56thisproc = psutil.Process() 57FEW_TIMES = 5 58 59 60def fewtimes_if_linux(): 61 """Decorator for those Linux functions which are implemented in pure 62 Python, and which we want to run faster. 63 """ 64 def decorator(fun): 65 @functools.wraps(fun) 66 def wrapper(self, *args, **kwargs): 67 if LINUX: 68 before = self.__class__.times 69 try: 70 self.__class__.times = FEW_TIMES 71 return fun(self, *args, **kwargs) 72 finally: 73 self.__class__.times = before 74 else: 75 return fun(self, *args, **kwargs) 76 return wrapper 77 return decorator 78 79 80# =================================================================== 81# Process class 82# =================================================================== 83 84 85class TestProcessObjectLeaks(TestMemoryLeak): 86 """Test leaks of Process class methods.""" 87 88 proc = thisproc 89 90 def test_coverage(self): 91 ns = process_namespace(None) 92 ns.test_class_coverage(self, ns.getters + ns.setters) 93 94 @fewtimes_if_linux() 95 def test_name(self): 96 self.execute(self.proc.name) 97 98 @fewtimes_if_linux() 99 def test_cmdline(self): 100 self.execute(self.proc.cmdline) 101 102 @fewtimes_if_linux() 103 def test_exe(self): 104 self.execute(self.proc.exe) 105 106 @fewtimes_if_linux() 107 def test_ppid(self): 108 self.execute(self.proc.ppid) 109 110 @unittest.skipIf(not POSIX, "POSIX only") 111 @fewtimes_if_linux() 112 def test_uids(self): 113 self.execute(self.proc.uids) 114 115 @unittest.skipIf(not POSIX, "POSIX only") 116 @fewtimes_if_linux() 117 def test_gids(self): 118 self.execute(self.proc.gids) 119 120 @fewtimes_if_linux() 121 def test_status(self): 122 self.execute(self.proc.status) 123 124 def test_nice(self): 125 self.execute(self.proc.nice) 126 127 def test_nice_set(self): 128 niceness = thisproc.nice() 129 self.execute(lambda: self.proc.nice(niceness)) 130 131 @unittest.skipIf(not HAS_IONICE, "not supported") 132 def test_ionice(self): 133 self.execute(self.proc.ionice) 134 135 @unittest.skipIf(not HAS_IONICE, "not supported") 136 def test_ionice_set(self): 137 if WINDOWS: 138 value = thisproc.ionice() 139 self.execute(lambda: self.proc.ionice(value)) 140 else: 141 self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE)) 142 fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) 143 self.execute_w_exc(OSError, fun) 144 145 @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported") 146 @fewtimes_if_linux() 147 def test_io_counters(self): 148 self.execute(self.proc.io_counters) 149 150 @unittest.skipIf(POSIX, "worthless on POSIX") 151 def test_username(self): 152 # always open 1 handle on Windows (only once) 153 psutil.Process().username() 154 self.execute(self.proc.username) 155 156 @fewtimes_if_linux() 157 def test_create_time(self): 158 self.execute(self.proc.create_time) 159 160 @fewtimes_if_linux() 161 @skip_on_access_denied(only_if=OPENBSD) 162 def test_num_threads(self): 163 self.execute(self.proc.num_threads) 164 165 @unittest.skipIf(not WINDOWS, "WINDOWS only") 166 def test_num_handles(self): 167 self.execute(self.proc.num_handles) 168 169 @unittest.skipIf(not POSIX, "POSIX only") 170 @fewtimes_if_linux() 171 def test_num_fds(self): 172 self.execute(self.proc.num_fds) 173 174 @fewtimes_if_linux() 175 def test_num_ctx_switches(self): 176 self.execute(self.proc.num_ctx_switches) 177 178 @fewtimes_if_linux() 179 @skip_on_access_denied(only_if=OPENBSD) 180 def test_threads(self): 181 self.execute(self.proc.threads) 182 183 @fewtimes_if_linux() 184 def test_cpu_times(self): 185 self.execute(self.proc.cpu_times) 186 187 @fewtimes_if_linux() 188 @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported") 189 def test_cpu_num(self): 190 self.execute(self.proc.cpu_num) 191 192 @fewtimes_if_linux() 193 def test_memory_info(self): 194 self.execute(self.proc.memory_info) 195 196 @fewtimes_if_linux() 197 def test_memory_full_info(self): 198 self.execute(self.proc.memory_full_info) 199 200 @unittest.skipIf(not POSIX, "POSIX only") 201 @fewtimes_if_linux() 202 def test_terminal(self): 203 self.execute(self.proc.terminal) 204 205 def test_resume(self): 206 times = FEW_TIMES if POSIX else self.times 207 self.execute(self.proc.resume, times=times) 208 209 @fewtimes_if_linux() 210 def test_cwd(self): 211 self.execute(self.proc.cwd) 212 213 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") 214 def test_cpu_affinity(self): 215 self.execute(self.proc.cpu_affinity) 216 217 @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported") 218 def test_cpu_affinity_set(self): 219 affinity = thisproc.cpu_affinity() 220 self.execute(lambda: self.proc.cpu_affinity(affinity)) 221 self.execute_w_exc( 222 ValueError, lambda: self.proc.cpu_affinity([-1])) 223 224 @fewtimes_if_linux() 225 def test_open_files(self): 226 with open(get_testfn(), 'w'): 227 self.execute(self.proc.open_files) 228 229 @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported") 230 @fewtimes_if_linux() 231 def test_memory_maps(self): 232 self.execute(self.proc.memory_maps) 233 234 @unittest.skipIf(not LINUX, "LINUX only") 235 @unittest.skipIf(not HAS_RLIMIT, "not supported") 236 def test_rlimit(self): 237 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE)) 238 239 @unittest.skipIf(not LINUX, "LINUX only") 240 @unittest.skipIf(not HAS_RLIMIT, "not supported") 241 def test_rlimit_set(self): 242 limit = thisproc.rlimit(psutil.RLIMIT_NOFILE) 243 self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit)) 244 self.execute_w_exc((OSError, ValueError), lambda: self.proc.rlimit(-1)) 245 246 @fewtimes_if_linux() 247 # Windows implementation is based on a single system-wide 248 # function (tested later). 249 @unittest.skipIf(WINDOWS, "worthless on WINDOWS") 250 def test_connections(self): 251 # TODO: UNIX sockets are temporarily implemented by parsing 252 # 'pfiles' cmd output; we don't want that part of the code to 253 # be executed. 254 with create_sockets(): 255 kind = 'inet' if SUNOS else 'all' 256 self.execute(lambda: self.proc.connections(kind)) 257 258 @unittest.skipIf(not HAS_ENVIRON, "not supported") 259 def test_environ(self): 260 self.execute(self.proc.environ) 261 262 @unittest.skipIf(not WINDOWS, "WINDOWS only") 263 def test_proc_info(self): 264 self.execute(lambda: cext.proc_info(os.getpid())) 265 266 267class TestTerminatedProcessLeaks(TestProcessObjectLeaks): 268 """Repeat the tests above looking for leaks occurring when dealing 269 with terminated processes raising NoSuchProcess exception. 270 The C functions are still invoked but will follow different code 271 paths. We'll check those code paths. 272 """ 273 274 @classmethod 275 def setUpClass(cls): 276 super().setUpClass() 277 cls.subp = spawn_testproc() 278 cls.proc = psutil.Process(cls.subp.pid) 279 cls.proc.kill() 280 cls.proc.wait() 281 282 @classmethod 283 def tearDownClass(cls): 284 super().tearDownClass() 285 terminate(cls.subp) 286 287 def call(self, fun): 288 try: 289 fun() 290 except psutil.NoSuchProcess: 291 pass 292 293 if WINDOWS: 294 295 def test_kill(self): 296 self.execute(self.proc.kill) 297 298 def test_terminate(self): 299 self.execute(self.proc.terminate) 300 301 def test_suspend(self): 302 self.execute(self.proc.suspend) 303 304 def test_resume(self): 305 self.execute(self.proc.resume) 306 307 def test_wait(self): 308 self.execute(self.proc.wait) 309 310 def test_proc_info(self): 311 # test dual implementation 312 def call(): 313 try: 314 return cext.proc_info(self.proc.pid) 315 except ProcessLookupError: 316 pass 317 318 self.execute(call) 319 320 321@unittest.skipIf(not WINDOWS, "WINDOWS only") 322class TestProcessDualImplementation(TestMemoryLeak): 323 324 def test_cmdline_peb_true(self): 325 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True)) 326 327 def test_cmdline_peb_false(self): 328 self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False)) 329 330 331# =================================================================== 332# system APIs 333# =================================================================== 334 335 336class TestModuleFunctionsLeaks(TestMemoryLeak): 337 """Test leaks of psutil module functions.""" 338 339 def test_coverage(self): 340 ns = system_namespace() 341 ns.test_class_coverage(self, ns.all) 342 343 # --- cpu 344 345 @fewtimes_if_linux() 346 def test_cpu_count(self): # logical 347 self.execute(lambda: psutil.cpu_count(logical=True)) 348 349 @fewtimes_if_linux() 350 def test_cpu_count_physical(self): 351 self.execute(lambda: psutil.cpu_count(logical=False)) 352 353 @fewtimes_if_linux() 354 def test_cpu_times(self): 355 self.execute(psutil.cpu_times) 356 357 @fewtimes_if_linux() 358 def test_per_cpu_times(self): 359 self.execute(lambda: psutil.cpu_times(percpu=True)) 360 361 @fewtimes_if_linux() 362 def test_cpu_stats(self): 363 self.execute(psutil.cpu_stats) 364 365 @fewtimes_if_linux() 366 @unittest.skipIf(not HAS_CPU_FREQ, "not supported") 367 def test_cpu_freq(self): 368 self.execute(psutil.cpu_freq) 369 370 @unittest.skipIf(not WINDOWS, "WINDOWS only") 371 def test_getloadavg(self): 372 psutil.getloadavg() 373 self.execute(psutil.getloadavg) 374 375 # --- mem 376 377 def test_virtual_memory(self): 378 self.execute(psutil.virtual_memory) 379 380 # TODO: remove this skip when this gets fixed 381 @unittest.skipIf(SUNOS, "worthless on SUNOS (uses a subprocess)") 382 def test_swap_memory(self): 383 self.execute(psutil.swap_memory) 384 385 def test_pid_exists(self): 386 times = FEW_TIMES if POSIX else self.times 387 self.execute(lambda: psutil.pid_exists(os.getpid()), times=times) 388 389 # --- disk 390 391 def test_disk_usage(self): 392 times = FEW_TIMES if POSIX else self.times 393 self.execute(lambda: psutil.disk_usage('.'), times=times) 394 395 def test_disk_partitions(self): 396 self.execute(psutil.disk_partitions) 397 398 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), 399 '/proc/diskstats not available on this Linux version') 400 @fewtimes_if_linux() 401 def test_disk_io_counters(self): 402 self.execute(lambda: psutil.disk_io_counters(nowrap=False)) 403 404 # --- proc 405 406 @fewtimes_if_linux() 407 def test_pids(self): 408 self.execute(psutil.pids) 409 410 # --- net 411 412 @fewtimes_if_linux() 413 @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported') 414 def test_net_io_counters(self): 415 self.execute(lambda: psutil.net_io_counters(nowrap=False)) 416 417 @fewtimes_if_linux() 418 @unittest.skipIf(MACOS and os.getuid() != 0, "need root access") 419 def test_net_connections(self): 420 # always opens and handle on Windows() (once) 421 psutil.net_connections(kind='all') 422 with create_sockets(): 423 self.execute(lambda: psutil.net_connections(kind='all')) 424 425 def test_net_if_addrs(self): 426 # Note: verified that on Windows this was a false positive. 427 tolerance = 80 * 1024 if WINDOWS else self.tolerance 428 self.execute(psutil.net_if_addrs, tolerance=tolerance) 429 430 def test_net_if_stats(self): 431 self.execute(psutil.net_if_stats) 432 433 # --- sensors 434 435 @fewtimes_if_linux() 436 @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported") 437 def test_sensors_battery(self): 438 self.execute(psutil.sensors_battery) 439 440 @fewtimes_if_linux() 441 @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported") 442 def test_sensors_temperatures(self): 443 self.execute(psutil.sensors_temperatures) 444 445 @fewtimes_if_linux() 446 @unittest.skipIf(not HAS_SENSORS_FANS, "not supported") 447 def test_sensors_fans(self): 448 self.execute(psutil.sensors_fans) 449 450 # --- others 451 452 @fewtimes_if_linux() 453 def test_boot_time(self): 454 self.execute(psutil.boot_time) 455 456 def test_users(self): 457 self.execute(psutil.users) 458 459 if WINDOWS: 460 461 # --- win services 462 463 def test_win_service_iter(self): 464 self.execute(cext.winservice_enumerate) 465 466 def test_win_service_get(self): 467 pass 468 469 def test_win_service_get_config(self): 470 name = next(psutil.win_service_iter()).name() 471 self.execute(lambda: cext.winservice_query_config(name)) 472 473 def test_win_service_get_status(self): 474 name = next(psutil.win_service_iter()).name() 475 self.execute(lambda: cext.winservice_query_status(name)) 476 477 def test_win_service_get_description(self): 478 name = next(psutil.win_service_iter()).name() 479 self.execute(lambda: cext.winservice_query_descr(name)) 480 481 482if __name__ == '__main__': 483 from psutil.tests.runner import run_from_name 484 run_from_name(__file__) 485