1#!/usr/bin/env python3
2
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""
8Tests for detecting function memory leaks (typically the ones
9implemented in C). It does so by calling a function many times and
10checking whether process memory usage keeps increasing between
11calls or over time.
12Note that this may produce false positives (especially on Windows
13for some reason).
14PyPy appears to be completely unstable for this framework, probably
15because of how its JIT handles memory, so tests are skipped.
16"""
17
18from __future__ import print_function
19import functools
20import os
21
22import psutil
23import psutil._common
24from psutil import LINUX
25from psutil import MACOS
26from psutil import OPENBSD
27from psutil import POSIX
28from psutil import SUNOS
29from psutil import WINDOWS
30from psutil._compat import ProcessLookupError
31from psutil._compat import super
32from psutil.tests import create_sockets
33from psutil.tests import get_testfn
34from psutil.tests import HAS_CPU_AFFINITY
35from psutil.tests import HAS_CPU_FREQ
36from psutil.tests import HAS_ENVIRON
37from psutil.tests import HAS_IONICE
38from psutil.tests import HAS_MEMORY_MAPS
39from psutil.tests import HAS_NET_IO_COUNTERS
40from psutil.tests import HAS_PROC_CPU_NUM
41from psutil.tests import HAS_PROC_IO_COUNTERS
42from psutil.tests import HAS_RLIMIT
43from psutil.tests import HAS_SENSORS_BATTERY
44from psutil.tests import HAS_SENSORS_FANS
45from psutil.tests import HAS_SENSORS_TEMPERATURES
46from psutil.tests import process_namespace
47from psutil.tests import skip_on_access_denied
48from psutil.tests import spawn_testproc
49from psutil.tests import system_namespace
50from psutil.tests import terminate
51from psutil.tests import TestMemoryLeak
52from psutil.tests import unittest
53
54
55cext = psutil._psplatform.cext
56thisproc = psutil.Process()
57FEW_TIMES = 5
58
59
60def fewtimes_if_linux():
61    """Decorator for those Linux functions which are implemented in pure
62    Python, and which we want to run faster.
63    """
64    def decorator(fun):
65        @functools.wraps(fun)
66        def wrapper(self, *args, **kwargs):
67            if LINUX:
68                before = self.__class__.times
69                try:
70                    self.__class__.times = FEW_TIMES
71                    return fun(self, *args, **kwargs)
72                finally:
73                    self.__class__.times = before
74            else:
75                return fun(self, *args, **kwargs)
76        return wrapper
77    return decorator
78
79
80# ===================================================================
81# Process class
82# ===================================================================
83
84
85class TestProcessObjectLeaks(TestMemoryLeak):
86    """Test leaks of Process class methods."""
87
88    proc = thisproc
89
90    def test_coverage(self):
91        ns = process_namespace(None)
92        ns.test_class_coverage(self, ns.getters + ns.setters)
93
94    @fewtimes_if_linux()
95    def test_name(self):
96        self.execute(self.proc.name)
97
98    @fewtimes_if_linux()
99    def test_cmdline(self):
100        self.execute(self.proc.cmdline)
101
102    @fewtimes_if_linux()
103    def test_exe(self):
104        self.execute(self.proc.exe)
105
106    @fewtimes_if_linux()
107    def test_ppid(self):
108        self.execute(self.proc.ppid)
109
110    @unittest.skipIf(not POSIX, "POSIX only")
111    @fewtimes_if_linux()
112    def test_uids(self):
113        self.execute(self.proc.uids)
114
115    @unittest.skipIf(not POSIX, "POSIX only")
116    @fewtimes_if_linux()
117    def test_gids(self):
118        self.execute(self.proc.gids)
119
120    @fewtimes_if_linux()
121    def test_status(self):
122        self.execute(self.proc.status)
123
124    def test_nice(self):
125        self.execute(self.proc.nice)
126
127    def test_nice_set(self):
128        niceness = thisproc.nice()
129        self.execute(lambda: self.proc.nice(niceness))
130
131    @unittest.skipIf(not HAS_IONICE, "not supported")
132    def test_ionice(self):
133        self.execute(self.proc.ionice)
134
135    @unittest.skipIf(not HAS_IONICE, "not supported")
136    def test_ionice_set(self):
137        if WINDOWS:
138            value = thisproc.ionice()
139            self.execute(lambda: self.proc.ionice(value))
140        else:
141            self.execute(lambda: self.proc.ionice(psutil.IOPRIO_CLASS_NONE))
142            fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
143            self.execute_w_exc(OSError, fun)
144
145    @unittest.skipIf(not HAS_PROC_IO_COUNTERS, "not supported")
146    @fewtimes_if_linux()
147    def test_io_counters(self):
148        self.execute(self.proc.io_counters)
149
150    @unittest.skipIf(POSIX, "worthless on POSIX")
151    def test_username(self):
152        # always open 1 handle on Windows (only once)
153        psutil.Process().username()
154        self.execute(self.proc.username)
155
156    @fewtimes_if_linux()
157    def test_create_time(self):
158        self.execute(self.proc.create_time)
159
160    @fewtimes_if_linux()
161    @skip_on_access_denied(only_if=OPENBSD)
162    def test_num_threads(self):
163        self.execute(self.proc.num_threads)
164
165    @unittest.skipIf(not WINDOWS, "WINDOWS only")
166    def test_num_handles(self):
167        self.execute(self.proc.num_handles)
168
169    @unittest.skipIf(not POSIX, "POSIX only")
170    @fewtimes_if_linux()
171    def test_num_fds(self):
172        self.execute(self.proc.num_fds)
173
174    @fewtimes_if_linux()
175    def test_num_ctx_switches(self):
176        self.execute(self.proc.num_ctx_switches)
177
178    @fewtimes_if_linux()
179    @skip_on_access_denied(only_if=OPENBSD)
180    def test_threads(self):
181        self.execute(self.proc.threads)
182
183    @fewtimes_if_linux()
184    def test_cpu_times(self):
185        self.execute(self.proc.cpu_times)
186
187    @fewtimes_if_linux()
188    @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
189    def test_cpu_num(self):
190        self.execute(self.proc.cpu_num)
191
192    @fewtimes_if_linux()
193    def test_memory_info(self):
194        self.execute(self.proc.memory_info)
195
196    @fewtimes_if_linux()
197    def test_memory_full_info(self):
198        self.execute(self.proc.memory_full_info)
199
200    @unittest.skipIf(not POSIX, "POSIX only")
201    @fewtimes_if_linux()
202    def test_terminal(self):
203        self.execute(self.proc.terminal)
204
205    def test_resume(self):
206        times = FEW_TIMES if POSIX else self.times
207        self.execute(self.proc.resume, times=times)
208
209    @fewtimes_if_linux()
210    def test_cwd(self):
211        self.execute(self.proc.cwd)
212
213    @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")
214    def test_cpu_affinity(self):
215        self.execute(self.proc.cpu_affinity)
216
217    @unittest.skipIf(not HAS_CPU_AFFINITY, "not supported")
218    def test_cpu_affinity_set(self):
219        affinity = thisproc.cpu_affinity()
220        self.execute(lambda: self.proc.cpu_affinity(affinity))
221        self.execute_w_exc(
222            ValueError, lambda: self.proc.cpu_affinity([-1]))
223
224    @fewtimes_if_linux()
225    def test_open_files(self):
226        with open(get_testfn(), 'w'):
227            self.execute(self.proc.open_files)
228
229    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
230    @fewtimes_if_linux()
231    def test_memory_maps(self):
232        self.execute(self.proc.memory_maps)
233
234    @unittest.skipIf(not LINUX, "LINUX only")
235    @unittest.skipIf(not HAS_RLIMIT, "not supported")
236    def test_rlimit(self):
237        self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE))
238
239    @unittest.skipIf(not LINUX, "LINUX only")
240    @unittest.skipIf(not HAS_RLIMIT, "not supported")
241    def test_rlimit_set(self):
242        limit = thisproc.rlimit(psutil.RLIMIT_NOFILE)
243        self.execute(lambda: self.proc.rlimit(psutil.RLIMIT_NOFILE, limit))
244        self.execute_w_exc((OSError, ValueError), lambda: self.proc.rlimit(-1))
245
246    @fewtimes_if_linux()
247    # Windows implementation is based on a single system-wide
248    # function (tested later).
249    @unittest.skipIf(WINDOWS, "worthless on WINDOWS")
250    def test_connections(self):
251        # TODO: UNIX sockets are temporarily implemented by parsing
252        # 'pfiles' cmd  output; we don't want that part of the code to
253        # be executed.
254        with create_sockets():
255            kind = 'inet' if SUNOS else 'all'
256            self.execute(lambda: self.proc.connections(kind))
257
258    @unittest.skipIf(not HAS_ENVIRON, "not supported")
259    def test_environ(self):
260        self.execute(self.proc.environ)
261
262    @unittest.skipIf(not WINDOWS, "WINDOWS only")
263    def test_proc_info(self):
264        self.execute(lambda: cext.proc_info(os.getpid()))
265
266
267class TestTerminatedProcessLeaks(TestProcessObjectLeaks):
268    """Repeat the tests above looking for leaks occurring when dealing
269    with terminated processes raising NoSuchProcess exception.
270    The C functions are still invoked but will follow different code
271    paths. We'll check those code paths.
272    """
273
274    @classmethod
275    def setUpClass(cls):
276        super().setUpClass()
277        cls.subp = spawn_testproc()
278        cls.proc = psutil.Process(cls.subp.pid)
279        cls.proc.kill()
280        cls.proc.wait()
281
282    @classmethod
283    def tearDownClass(cls):
284        super().tearDownClass()
285        terminate(cls.subp)
286
287    def call(self, fun):
288        try:
289            fun()
290        except psutil.NoSuchProcess:
291            pass
292
293    if WINDOWS:
294
295        def test_kill(self):
296            self.execute(self.proc.kill)
297
298        def test_terminate(self):
299            self.execute(self.proc.terminate)
300
301        def test_suspend(self):
302            self.execute(self.proc.suspend)
303
304        def test_resume(self):
305            self.execute(self.proc.resume)
306
307        def test_wait(self):
308            self.execute(self.proc.wait)
309
310        def test_proc_info(self):
311            # test dual implementation
312            def call():
313                try:
314                    return cext.proc_info(self.proc.pid)
315                except ProcessLookupError:
316                    pass
317
318            self.execute(call)
319
320
321@unittest.skipIf(not WINDOWS, "WINDOWS only")
322class TestProcessDualImplementation(TestMemoryLeak):
323
324    def test_cmdline_peb_true(self):
325        self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=True))
326
327    def test_cmdline_peb_false(self):
328        self.execute(lambda: cext.proc_cmdline(os.getpid(), use_peb=False))
329
330
331# ===================================================================
332# system APIs
333# ===================================================================
334
335
336class TestModuleFunctionsLeaks(TestMemoryLeak):
337    """Test leaks of psutil module functions."""
338
339    def test_coverage(self):
340        ns = system_namespace()
341        ns.test_class_coverage(self, ns.all)
342
343    # --- cpu
344
345    @fewtimes_if_linux()
346    def test_cpu_count(self):  # logical
347        self.execute(lambda: psutil.cpu_count(logical=True))
348
349    @fewtimes_if_linux()
350    def test_cpu_count_physical(self):
351        self.execute(lambda: psutil.cpu_count(logical=False))
352
353    @fewtimes_if_linux()
354    def test_cpu_times(self):
355        self.execute(psutil.cpu_times)
356
357    @fewtimes_if_linux()
358    def test_per_cpu_times(self):
359        self.execute(lambda: psutil.cpu_times(percpu=True))
360
361    @fewtimes_if_linux()
362    def test_cpu_stats(self):
363        self.execute(psutil.cpu_stats)
364
365    @fewtimes_if_linux()
366    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
367    def test_cpu_freq(self):
368        self.execute(psutil.cpu_freq)
369
370    @unittest.skipIf(not WINDOWS, "WINDOWS only")
371    def test_getloadavg(self):
372        psutil.getloadavg()
373        self.execute(psutil.getloadavg)
374
375    # --- mem
376
377    def test_virtual_memory(self):
378        self.execute(psutil.virtual_memory)
379
380    # TODO: remove this skip when this gets fixed
381    @unittest.skipIf(SUNOS, "worthless on SUNOS (uses a subprocess)")
382    def test_swap_memory(self):
383        self.execute(psutil.swap_memory)
384
385    def test_pid_exists(self):
386        times = FEW_TIMES if POSIX else self.times
387        self.execute(lambda: psutil.pid_exists(os.getpid()), times=times)
388
389    # --- disk
390
391    def test_disk_usage(self):
392        times = FEW_TIMES if POSIX else self.times
393        self.execute(lambda: psutil.disk_usage('.'), times=times)
394
395    def test_disk_partitions(self):
396        self.execute(psutil.disk_partitions)
397
398    @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
399                     '/proc/diskstats not available on this Linux version')
400    @fewtimes_if_linux()
401    def test_disk_io_counters(self):
402        self.execute(lambda: psutil.disk_io_counters(nowrap=False))
403
404    # --- proc
405
406    @fewtimes_if_linux()
407    def test_pids(self):
408        self.execute(psutil.pids)
409
410    # --- net
411
412    @fewtimes_if_linux()
413    @unittest.skipIf(not HAS_NET_IO_COUNTERS, 'not supported')
414    def test_net_io_counters(self):
415        self.execute(lambda: psutil.net_io_counters(nowrap=False))
416
417    @fewtimes_if_linux()
418    @unittest.skipIf(MACOS and os.getuid() != 0, "need root access")
419    def test_net_connections(self):
420        # always opens and handle on Windows() (once)
421        psutil.net_connections(kind='all')
422        with create_sockets():
423            self.execute(lambda: psutil.net_connections(kind='all'))
424
425    def test_net_if_addrs(self):
426        # Note: verified that on Windows this was a false positive.
427        tolerance = 80 * 1024 if WINDOWS else self.tolerance
428        self.execute(psutil.net_if_addrs, tolerance=tolerance)
429
430    def test_net_if_stats(self):
431        self.execute(psutil.net_if_stats)
432
433    # --- sensors
434
435    @fewtimes_if_linux()
436    @unittest.skipIf(not HAS_SENSORS_BATTERY, "not supported")
437    def test_sensors_battery(self):
438        self.execute(psutil.sensors_battery)
439
440    @fewtimes_if_linux()
441    @unittest.skipIf(not HAS_SENSORS_TEMPERATURES, "not supported")
442    def test_sensors_temperatures(self):
443        self.execute(psutil.sensors_temperatures)
444
445    @fewtimes_if_linux()
446    @unittest.skipIf(not HAS_SENSORS_FANS, "not supported")
447    def test_sensors_fans(self):
448        self.execute(psutil.sensors_fans)
449
450    # --- others
451
452    @fewtimes_if_linux()
453    def test_boot_time(self):
454        self.execute(psutil.boot_time)
455
456    def test_users(self):
457        self.execute(psutil.users)
458
459    if WINDOWS:
460
461        # --- win services
462
463        def test_win_service_iter(self):
464            self.execute(cext.winservice_enumerate)
465
466        def test_win_service_get(self):
467            pass
468
469        def test_win_service_get_config(self):
470            name = next(psutil.win_service_iter()).name()
471            self.execute(lambda: cext.winservice_query_config(name))
472
473        def test_win_service_get_status(self):
474            name = next(psutil.win_service_iter()).name()
475            self.execute(lambda: cext.winservice_query_status(name))
476
477        def test_win_service_get_description(self):
478            name = next(psutil.win_service_iter()).name()
479            self.execute(lambda: cext.winservice_query_descr(name))
480
481
482if __name__ == '__main__':
483    from psutil.tests.runner import run_from_name
484    run_from_name(__file__)
485