1#!/usr/bin/env python
2
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""
8A test script which attempts to detect memory leaks by calling C
9functions many times and compare process memory usage before and
10after the calls.  It might produce false positives.
11"""
12
13import functools
14import gc
15import os
16import socket
17import sys
18import threading
19import time
20
21import psutil
22import psutil._common
23
24from psutil._compat import xrange, callable
25from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN,
26                         RLIMIT_SUPPORT, TRAVIS)
27from test_psutil import (reap_children, supports_ipv6, safe_remove,
28                         get_test_subprocess)
29
30if sys.version_info < (2, 7):
31    import unittest2 as unittest  # https://pypi.python.org/pypi/unittest2
32else:
33    import unittest
34
35
36LOOPS = 1000
37TOLERANCE = 4096
38SKIP_PYTHON_IMPL = True
39
40
41def skip_if_linux():
42    return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
43                           "not worth being tested on LINUX (pure python)")
44
45
46class Base(unittest.TestCase):
47    proc = psutil.Process()
48
49    def execute(self, function, *args, **kwargs):
50        def call_many_times():
51            for x in xrange(LOOPS - 1):
52                self.call(function, *args, **kwargs)
53            del x
54            gc.collect()
55            return self.get_mem()
56
57        self.call(function, *args, **kwargs)
58        self.assertEqual(gc.garbage, [])
59        self.assertEqual(threading.active_count(), 1)
60
61        # RSS comparison
62        # step 1
63        rss1 = call_many_times()
64        # step 2
65        rss2 = call_many_times()
66
67        difference = rss2 - rss1
68        if difference > TOLERANCE:
69            # This doesn't necessarily mean we have a leak yet.
70            # At this point we assume that after having called the
71            # function so many times the memory usage is stabilized
72            # and if there are no leaks it should not increase any
73            # more.
74            # Let's keep calling fun for 3 more seconds and fail if
75            # we notice any difference.
76            stop_at = time.time() + 3
77            while True:
78                self.call(function, *args, **kwargs)
79                if time.time() >= stop_at:
80                    break
81            del stop_at
82            gc.collect()
83            rss3 = self.get_mem()
84            difference = rss3 - rss2
85            if rss3 > rss2:
86                self.fail("rss2=%s, rss3=%s, difference=%s"
87                          % (rss2, rss3, difference))
88
89    def execute_w_exc(self, exc, function, *args, **kwargs):
90        kwargs['_exc'] = exc
91        self.execute(function, *args, **kwargs)
92
93    def get_mem(self):
94        return psutil.Process().memory_info()[0]
95
96    def call(self, function, *args, **kwargs):
97        raise NotImplementedError("must be implemented in subclass")
98
99
100class TestProcessObjectLeaks(Base):
101    """Test leaks of Process class methods and properties"""
102
103    def setUp(self):
104        gc.collect()
105
106    def tearDown(self):
107        reap_children()
108
109    def call(self, function, *args, **kwargs):
110        if callable(function):
111            if '_exc' in kwargs:
112                exc = kwargs.pop('_exc')
113                self.assertRaises(exc, function, *args, **kwargs)
114            else:
115                try:
116                    function(*args, **kwargs)
117                except psutil.Error:
118                    pass
119        else:
120            meth = getattr(self.proc, function)
121            if '_exc' in kwargs:
122                exc = kwargs.pop('_exc')
123                self.assertRaises(exc, meth, *args, **kwargs)
124            else:
125                try:
126                    meth(*args, **kwargs)
127                except psutil.Error:
128                    pass
129
130    @skip_if_linux()
131    def test_name(self):
132        self.execute('name')
133
134    @skip_if_linux()
135    def test_cmdline(self):
136        self.execute('cmdline')
137
138    @skip_if_linux()
139    def test_exe(self):
140        self.execute('exe')
141
142    @skip_if_linux()
143    def test_ppid(self):
144        self.execute('ppid')
145
146    @unittest.skipUnless(POSIX, "POSIX only")
147    @skip_if_linux()
148    def test_uids(self):
149        self.execute('uids')
150
151    @unittest.skipUnless(POSIX, "POSIX only")
152    @skip_if_linux()
153    def test_gids(self):
154        self.execute('gids')
155
156    @skip_if_linux()
157    def test_status(self):
158        self.execute('status')
159
160    def test_nice_get(self):
161        self.execute('nice')
162
163    def test_nice_set(self):
164        niceness = psutil.Process().nice()
165        self.execute('nice', niceness)
166
167    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
168                         "Linux and Windows Vista only")
169    def test_ionice_get(self):
170        self.execute('ionice')
171
172    @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
173                         "Linux and Windows Vista only")
174    def test_ionice_set(self):
175        if WINDOWS:
176            value = psutil.Process().ionice()
177            self.execute('ionice', value)
178        else:
179            from psutil._pslinux import cext
180            self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
181            fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
182            self.execute_w_exc(OSError, fun)
183
184    @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
185    @skip_if_linux()
186    def test_io_counters(self):
187        self.execute('io_counters')
188
189    @unittest.skipUnless(WINDOWS, "not worth being tested on posix")
190    def test_username(self):
191        self.execute('username')
192
193    @skip_if_linux()
194    def test_create_time(self):
195        self.execute('create_time')
196
197    @skip_if_linux()
198    def test_num_threads(self):
199        self.execute('num_threads')
200
201    @unittest.skipUnless(WINDOWS, "Windows only")
202    def test_num_handles(self):
203        self.execute('num_handles')
204
205    @unittest.skipUnless(POSIX, "POSIX only")
206    @skip_if_linux()
207    def test_num_fds(self):
208        self.execute('num_fds')
209
210    @skip_if_linux()
211    def test_threads(self):
212        self.execute('threads')
213
214    @skip_if_linux()
215    def test_cpu_times(self):
216        self.execute('cpu_times')
217
218    @skip_if_linux()
219    def test_memory_info(self):
220        self.execute('memory_info')
221
222    @skip_if_linux()
223    def test_memory_info_ex(self):
224        self.execute('memory_info_ex')
225
226    @unittest.skipUnless(POSIX, "POSIX only")
227    @skip_if_linux()
228    def test_terminal(self):
229        self.execute('terminal')
230
231    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
232                     "not worth being tested on POSIX (pure python)")
233    def test_resume(self):
234        self.execute('resume')
235
236    @skip_if_linux()
237    def test_cwd(self):
238        self.execute('cwd')
239
240    @unittest.skipUnless(WINDOWS or LINUX or BSD,
241                         "Windows or Linux or BSD only")
242    def test_cpu_affinity_get(self):
243        self.execute('cpu_affinity')
244
245    @unittest.skipUnless(WINDOWS or LINUX or BSD,
246                         "Windows or Linux or BSD only")
247    def test_cpu_affinity_set(self):
248        affinity = psutil.Process().cpu_affinity()
249        self.execute('cpu_affinity', affinity)
250        if not TRAVIS:
251            self.execute_w_exc(ValueError, 'cpu_affinity', [-1])
252
253    @skip_if_linux()
254    def test_open_files(self):
255        safe_remove(TESTFN)  # needed after UNIX socket test has run
256        with open(TESTFN, 'w'):
257            self.execute('open_files')
258
259    # OSX implementation is unbelievably slow
260    @unittest.skipIf(OSX, "OSX implementation is too slow")
261    @skip_if_linux()
262    def test_memory_maps(self):
263        self.execute('memory_maps')
264
265    @unittest.skipUnless(LINUX, "Linux only")
266    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
267                         "only available on Linux >= 2.6.36")
268    def test_rlimit_get(self):
269        self.execute('rlimit', psutil.RLIMIT_NOFILE)
270
271    @unittest.skipUnless(LINUX, "Linux only")
272    @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
273                         "only available on Linux >= 2.6.36")
274    def test_rlimit_set(self):
275        limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
276        self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
277        self.execute_w_exc(OSError, 'rlimit', -1)
278
279    @skip_if_linux()
280    # Windows implementation is based on a single system-wide function
281    @unittest.skipIf(WINDOWS, "tested later")
282    def test_connections(self):
283        def create_socket(family, type):
284            sock = socket.socket(family, type)
285            sock.bind(('', 0))
286            if type == socket.SOCK_STREAM:
287                sock.listen(1)
288            return sock
289
290        socks = []
291        socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
292        socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
293        if supports_ipv6():
294            socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
295            socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
296        if hasattr(socket, 'AF_UNIX'):
297            safe_remove(TESTFN)
298            s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
299            s.bind(TESTFN)
300            s.listen(1)
301            socks.append(s)
302        kind = 'all'
303        # TODO: UNIX sockets are temporarily implemented by parsing
304        # 'pfiles' cmd  output; we don't want that part of the code to
305        # be executed.
306        if SUNOS:
307            kind = 'inet'
308        try:
309            self.execute('connections', kind=kind)
310        finally:
311            for s in socks:
312                s.close()
313
314
315p = get_test_subprocess()
316DEAD_PROC = psutil.Process(p.pid)
317DEAD_PROC.kill()
318DEAD_PROC.wait()
319del p
320
321
322class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
323    """Same as above but looks for leaks occurring when dealing with
324    zombie processes raising NoSuchProcess exception.
325    """
326    proc = DEAD_PROC
327
328    def call(self, *args, **kwargs):
329        try:
330            TestProcessObjectLeaks.call(self, *args, **kwargs)
331        except psutil.NoSuchProcess:
332            pass
333
334    if not POSIX:
335        def test_kill(self):
336            self.execute('kill')
337
338        def test_terminate(self):
339            self.execute('terminate')
340
341        def test_suspend(self):
342            self.execute('suspend')
343
344        def test_resume(self):
345            self.execute('resume')
346
347        def test_wait(self):
348            self.execute('wait')
349
350
351class TestModuleFunctionsLeaks(Base):
352    """Test leaks of psutil module functions."""
353
354    def setUp(self):
355        gc.collect()
356
357    def call(self, function, *args, **kwargs):
358        fun = getattr(psutil, function)
359        fun(*args, **kwargs)
360
361    @skip_if_linux()
362    def test_cpu_count_logical(self):
363        psutil.cpu_count = psutil._psplatform.cpu_count_logical
364        self.execute('cpu_count')
365
366    @skip_if_linux()
367    def test_cpu_count_physical(self):
368        psutil.cpu_count = psutil._psplatform.cpu_count_physical
369        self.execute('cpu_count')
370
371    @skip_if_linux()
372    def test_boot_time(self):
373        self.execute('boot_time')
374
375    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
376                     "not worth being tested on POSIX (pure python)")
377    def test_pid_exists(self):
378        self.execute('pid_exists', os.getpid())
379
380    def test_virtual_memory(self):
381        self.execute('virtual_memory')
382
383    # TODO: remove this skip when this gets fixed
384    @unittest.skipIf(SUNOS,
385                     "not worth being tested on SUNOS (uses a subprocess)")
386    def test_swap_memory(self):
387        self.execute('swap_memory')
388
389    @skip_if_linux()
390    def test_cpu_times(self):
391        self.execute('cpu_times')
392
393    @skip_if_linux()
394    def test_per_cpu_times(self):
395        self.execute('cpu_times', percpu=True)
396
397    @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
398                     "not worth being tested on POSIX (pure python)")
399    def test_disk_usage(self):
400        self.execute('disk_usage', '.')
401
402    def test_disk_partitions(self):
403        self.execute('disk_partitions')
404
405    @skip_if_linux()
406    def test_net_io_counters(self):
407        self.execute('net_io_counters')
408
409    @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
410                     '/proc/diskstats not available on this Linux version')
411    @skip_if_linux()
412    def test_disk_io_counters(self):
413        self.execute('disk_io_counters')
414
415    # XXX - on Windows this produces a false positive
416    @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
417    def test_users(self):
418        self.execute('users')
419
420    @unittest.skipIf(LINUX,
421                     "not worth being tested on Linux (pure python)")
422    def test_net_connections(self):
423        self.execute('net_connections')
424
425    def test_net_if_addrs(self):
426        self.execute('net_if_addrs')
427
428    @unittest.skipIf(TRAVIS, "EPERM on travis")
429    def test_net_if_stats(self):
430        self.execute('net_if_stats')
431
432
433def main():
434    test_suite = unittest.TestSuite()
435    tests = [TestProcessObjectLeaksZombie,
436             TestProcessObjectLeaks,
437             TestModuleFunctionsLeaks]
438    for test in tests:
439        test_suite.addTest(unittest.makeSuite(test))
440    result = unittest.TextTestRunner(verbosity=2).run(test_suite)
441    return result.wasSuccessful()
442
443if __name__ == '__main__':
444    if not main():
445        sys.exit(1)
446