1#!/usr/bin/env python 2 3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. 4# Use of this source code is governed by a BSD-style license that can be 5# found in the LICENSE file. 6 7""" 8A test script which attempts to detect memory leaks by calling C 9functions many times and compare process memory usage before and 10after the calls. It might produce false positives. 11""" 12 13import functools 14import gc 15import os 16import socket 17import sys 18import threading 19import time 20 21import psutil 22import psutil._common 23 24from psutil._compat import xrange, callable 25from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN, 26 RLIMIT_SUPPORT, TRAVIS) 27from test_psutil import (reap_children, supports_ipv6, safe_remove, 28 get_test_subprocess) 29 30if sys.version_info < (2, 7): 31 import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 32else: 33 import unittest 34 35 36LOOPS = 1000 37TOLERANCE = 4096 38SKIP_PYTHON_IMPL = True 39 40 41def skip_if_linux(): 42 return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL, 43 "not worth being tested on LINUX (pure python)") 44 45 46class Base(unittest.TestCase): 47 proc = psutil.Process() 48 49 def execute(self, function, *args, **kwargs): 50 def call_many_times(): 51 for x in xrange(LOOPS - 1): 52 self.call(function, *args, **kwargs) 53 del x 54 gc.collect() 55 return self.get_mem() 56 57 self.call(function, *args, **kwargs) 58 self.assertEqual(gc.garbage, []) 59 self.assertEqual(threading.active_count(), 1) 60 61 # RSS comparison 62 # step 1 63 rss1 = call_many_times() 64 # step 2 65 rss2 = call_many_times() 66 67 difference = rss2 - rss1 68 if difference > TOLERANCE: 69 # This doesn't necessarily mean we have a leak yet. 70 # At this point we assume that after having called the 71 # function so many times the memory usage is stabilized 72 # and if there are no leaks it should not increase any 73 # more. 74 # Let's keep calling fun for 3 more seconds and fail if 75 # we notice any difference. 76 stop_at = time.time() + 3 77 while True: 78 self.call(function, *args, **kwargs) 79 if time.time() >= stop_at: 80 break 81 del stop_at 82 gc.collect() 83 rss3 = self.get_mem() 84 difference = rss3 - rss2 85 if rss3 > rss2: 86 self.fail("rss2=%s, rss3=%s, difference=%s" 87 % (rss2, rss3, difference)) 88 89 def execute_w_exc(self, exc, function, *args, **kwargs): 90 kwargs['_exc'] = exc 91 self.execute(function, *args, **kwargs) 92 93 def get_mem(self): 94 return psutil.Process().memory_info()[0] 95 96 def call(self, function, *args, **kwargs): 97 raise NotImplementedError("must be implemented in subclass") 98 99 100class TestProcessObjectLeaks(Base): 101 """Test leaks of Process class methods and properties""" 102 103 def setUp(self): 104 gc.collect() 105 106 def tearDown(self): 107 reap_children() 108 109 def call(self, function, *args, **kwargs): 110 if callable(function): 111 if '_exc' in kwargs: 112 exc = kwargs.pop('_exc') 113 self.assertRaises(exc, function, *args, **kwargs) 114 else: 115 try: 116 function(*args, **kwargs) 117 except psutil.Error: 118 pass 119 else: 120 meth = getattr(self.proc, function) 121 if '_exc' in kwargs: 122 exc = kwargs.pop('_exc') 123 self.assertRaises(exc, meth, *args, **kwargs) 124 else: 125 try: 126 meth(*args, **kwargs) 127 except psutil.Error: 128 pass 129 130 @skip_if_linux() 131 def test_name(self): 132 self.execute('name') 133 134 @skip_if_linux() 135 def test_cmdline(self): 136 self.execute('cmdline') 137 138 @skip_if_linux() 139 def test_exe(self): 140 self.execute('exe') 141 142 @skip_if_linux() 143 def test_ppid(self): 144 self.execute('ppid') 145 146 @unittest.skipUnless(POSIX, "POSIX only") 147 @skip_if_linux() 148 def test_uids(self): 149 self.execute('uids') 150 151 @unittest.skipUnless(POSIX, "POSIX only") 152 @skip_if_linux() 153 def test_gids(self): 154 self.execute('gids') 155 156 @skip_if_linux() 157 def test_status(self): 158 self.execute('status') 159 160 def test_nice_get(self): 161 self.execute('nice') 162 163 def test_nice_set(self): 164 niceness = psutil.Process().nice() 165 self.execute('nice', niceness) 166 167 @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), 168 "Linux and Windows Vista only") 169 def test_ionice_get(self): 170 self.execute('ionice') 171 172 @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), 173 "Linux and Windows Vista only") 174 def test_ionice_set(self): 175 if WINDOWS: 176 value = psutil.Process().ionice() 177 self.execute('ionice', value) 178 else: 179 from psutil._pslinux import cext 180 self.execute('ionice', psutil.IOPRIO_CLASS_NONE) 181 fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) 182 self.execute_w_exc(OSError, fun) 183 184 @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform") 185 @skip_if_linux() 186 def test_io_counters(self): 187 self.execute('io_counters') 188 189 @unittest.skipUnless(WINDOWS, "not worth being tested on posix") 190 def test_username(self): 191 self.execute('username') 192 193 @skip_if_linux() 194 def test_create_time(self): 195 self.execute('create_time') 196 197 @skip_if_linux() 198 def test_num_threads(self): 199 self.execute('num_threads') 200 201 @unittest.skipUnless(WINDOWS, "Windows only") 202 def test_num_handles(self): 203 self.execute('num_handles') 204 205 @unittest.skipUnless(POSIX, "POSIX only") 206 @skip_if_linux() 207 def test_num_fds(self): 208 self.execute('num_fds') 209 210 @skip_if_linux() 211 def test_threads(self): 212 self.execute('threads') 213 214 @skip_if_linux() 215 def test_cpu_times(self): 216 self.execute('cpu_times') 217 218 @skip_if_linux() 219 def test_memory_info(self): 220 self.execute('memory_info') 221 222 @skip_if_linux() 223 def test_memory_info_ex(self): 224 self.execute('memory_info_ex') 225 226 @unittest.skipUnless(POSIX, "POSIX only") 227 @skip_if_linux() 228 def test_terminal(self): 229 self.execute('terminal') 230 231 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, 232 "not worth being tested on POSIX (pure python)") 233 def test_resume(self): 234 self.execute('resume') 235 236 @skip_if_linux() 237 def test_cwd(self): 238 self.execute('cwd') 239 240 @unittest.skipUnless(WINDOWS or LINUX or BSD, 241 "Windows or Linux or BSD only") 242 def test_cpu_affinity_get(self): 243 self.execute('cpu_affinity') 244 245 @unittest.skipUnless(WINDOWS or LINUX or BSD, 246 "Windows or Linux or BSD only") 247 def test_cpu_affinity_set(self): 248 affinity = psutil.Process().cpu_affinity() 249 self.execute('cpu_affinity', affinity) 250 if not TRAVIS: 251 self.execute_w_exc(ValueError, 'cpu_affinity', [-1]) 252 253 @skip_if_linux() 254 def test_open_files(self): 255 safe_remove(TESTFN) # needed after UNIX socket test has run 256 with open(TESTFN, 'w'): 257 self.execute('open_files') 258 259 # OSX implementation is unbelievably slow 260 @unittest.skipIf(OSX, "OSX implementation is too slow") 261 @skip_if_linux() 262 def test_memory_maps(self): 263 self.execute('memory_maps') 264 265 @unittest.skipUnless(LINUX, "Linux only") 266 @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, 267 "only available on Linux >= 2.6.36") 268 def test_rlimit_get(self): 269 self.execute('rlimit', psutil.RLIMIT_NOFILE) 270 271 @unittest.skipUnless(LINUX, "Linux only") 272 @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, 273 "only available on Linux >= 2.6.36") 274 def test_rlimit_set(self): 275 limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE) 276 self.execute('rlimit', psutil.RLIMIT_NOFILE, limit) 277 self.execute_w_exc(OSError, 'rlimit', -1) 278 279 @skip_if_linux() 280 # Windows implementation is based on a single system-wide function 281 @unittest.skipIf(WINDOWS, "tested later") 282 def test_connections(self): 283 def create_socket(family, type): 284 sock = socket.socket(family, type) 285 sock.bind(('', 0)) 286 if type == socket.SOCK_STREAM: 287 sock.listen(1) 288 return sock 289 290 socks = [] 291 socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM)) 292 socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM)) 293 if supports_ipv6(): 294 socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM)) 295 socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM)) 296 if hasattr(socket, 'AF_UNIX'): 297 safe_remove(TESTFN) 298 s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) 299 s.bind(TESTFN) 300 s.listen(1) 301 socks.append(s) 302 kind = 'all' 303 # TODO: UNIX sockets are temporarily implemented by parsing 304 # 'pfiles' cmd output; we don't want that part of the code to 305 # be executed. 306 if SUNOS: 307 kind = 'inet' 308 try: 309 self.execute('connections', kind=kind) 310 finally: 311 for s in socks: 312 s.close() 313 314 315p = get_test_subprocess() 316DEAD_PROC = psutil.Process(p.pid) 317DEAD_PROC.kill() 318DEAD_PROC.wait() 319del p 320 321 322class TestProcessObjectLeaksZombie(TestProcessObjectLeaks): 323 """Same as above but looks for leaks occurring when dealing with 324 zombie processes raising NoSuchProcess exception. 325 """ 326 proc = DEAD_PROC 327 328 def call(self, *args, **kwargs): 329 try: 330 TestProcessObjectLeaks.call(self, *args, **kwargs) 331 except psutil.NoSuchProcess: 332 pass 333 334 if not POSIX: 335 def test_kill(self): 336 self.execute('kill') 337 338 def test_terminate(self): 339 self.execute('terminate') 340 341 def test_suspend(self): 342 self.execute('suspend') 343 344 def test_resume(self): 345 self.execute('resume') 346 347 def test_wait(self): 348 self.execute('wait') 349 350 351class TestModuleFunctionsLeaks(Base): 352 """Test leaks of psutil module functions.""" 353 354 def setUp(self): 355 gc.collect() 356 357 def call(self, function, *args, **kwargs): 358 fun = getattr(psutil, function) 359 fun(*args, **kwargs) 360 361 @skip_if_linux() 362 def test_cpu_count_logical(self): 363 psutil.cpu_count = psutil._psplatform.cpu_count_logical 364 self.execute('cpu_count') 365 366 @skip_if_linux() 367 def test_cpu_count_physical(self): 368 psutil.cpu_count = psutil._psplatform.cpu_count_physical 369 self.execute('cpu_count') 370 371 @skip_if_linux() 372 def test_boot_time(self): 373 self.execute('boot_time') 374 375 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, 376 "not worth being tested on POSIX (pure python)") 377 def test_pid_exists(self): 378 self.execute('pid_exists', os.getpid()) 379 380 def test_virtual_memory(self): 381 self.execute('virtual_memory') 382 383 # TODO: remove this skip when this gets fixed 384 @unittest.skipIf(SUNOS, 385 "not worth being tested on SUNOS (uses a subprocess)") 386 def test_swap_memory(self): 387 self.execute('swap_memory') 388 389 @skip_if_linux() 390 def test_cpu_times(self): 391 self.execute('cpu_times') 392 393 @skip_if_linux() 394 def test_per_cpu_times(self): 395 self.execute('cpu_times', percpu=True) 396 397 @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, 398 "not worth being tested on POSIX (pure python)") 399 def test_disk_usage(self): 400 self.execute('disk_usage', '.') 401 402 def test_disk_partitions(self): 403 self.execute('disk_partitions') 404 405 @skip_if_linux() 406 def test_net_io_counters(self): 407 self.execute('net_io_counters') 408 409 @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), 410 '/proc/diskstats not available on this Linux version') 411 @skip_if_linux() 412 def test_disk_io_counters(self): 413 self.execute('disk_io_counters') 414 415 # XXX - on Windows this produces a false positive 416 @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows") 417 def test_users(self): 418 self.execute('users') 419 420 @unittest.skipIf(LINUX, 421 "not worth being tested on Linux (pure python)") 422 def test_net_connections(self): 423 self.execute('net_connections') 424 425 def test_net_if_addrs(self): 426 self.execute('net_if_addrs') 427 428 @unittest.skipIf(TRAVIS, "EPERM on travis") 429 def test_net_if_stats(self): 430 self.execute('net_if_stats') 431 432 433def main(): 434 test_suite = unittest.TestSuite() 435 tests = [TestProcessObjectLeaksZombie, 436 TestProcessObjectLeaks, 437 TestModuleFunctionsLeaks] 438 for test in tests: 439 test_suite.addTest(unittest.makeSuite(test)) 440 result = unittest.TextTestRunner(verbosity=2).run(test_suite) 441 return result.wasSuccessful() 442 443if __name__ == '__main__': 444 if not main(): 445 sys.exit(1) 446