1#!/usr/bin/env python3
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
7"""Tests for psutil.Process class."""
9import collections
10import errno
11import getpass
12import itertools
13import os
14import signal
15import socket
16import stat
17import subprocess
18import sys
19import textwrap
20import time
21import types
23import psutil
25from psutil import AIX
26from psutil import BSD
27from psutil import LINUX
28from psutil import MACOS
29from psutil import NETBSD
30from psutil import OPENBSD
31from psutil import OSX
32from psutil import POSIX
33from psutil import SUNOS
34from psutil import WINDOWS
35from psutil._common import open_text
36from psutil._compat import FileNotFoundError
37from psutil._compat import long
38from psutil._compat import PY3
39from psutil._compat import super
40from psutil.tests import APPVEYOR
41from psutil.tests import call_until
42from psutil.tests import CI_TESTING
43from psutil.tests import copyload_shared_lib
44from psutil.tests import create_exe
45from psutil.tests import GITHUB_ACTIONS
46from psutil.tests import GLOBAL_TIMEOUT
47from psutil.tests import HAS_CPU_AFFINITY
48from psutil.tests import HAS_ENVIRON
49from psutil.tests import HAS_IONICE
50from psutil.tests import HAS_MEMORY_MAPS
51from psutil.tests import HAS_PROC_CPU_NUM
52from psutil.tests import HAS_PROC_IO_COUNTERS
53from psutil.tests import HAS_RLIMIT
54from psutil.tests import HAS_THREADS
55from psutil.tests import mock
56from psutil.tests import process_namespace
57from psutil.tests import PsutilTestCase
58from psutil.tests import PYPY
59from psutil.tests import PYTHON_EXE
60from psutil.tests import reap_children
61from psutil.tests import retry_on_failure
62from psutil.tests import sh
63from psutil.tests import skip_on_access_denied
64from psutil.tests import skip_on_not_implemented
65from psutil.tests import ThreadTask
66from psutil.tests import unittest
67from psutil.tests import wait_for_pid
70# ===================================================================
71# --- psutil.Process class tests
72# ===================================================================
75class TestProcess(PsutilTestCase):
76    """Tests for psutil.Process class."""
78    def spawn_psproc(self, *args, **kwargs):
79        sproc = self.spawn_testproc(*args, **kwargs)
80        return psutil.Process(sproc.pid)
82    # ---
84    def test_pid(self):
85        p = psutil.Process()
86        self.assertEqual(p.pid, os.getpid())
87        with self.assertRaises(AttributeError):
88            p.pid = 33
90    def test_kill(self):
91        p = self.spawn_psproc()
92        p.kill()
93        code = p.wait()
94        if WINDOWS:
95            self.assertEqual(code, signal.SIGTERM)
96        else:
97            self.assertEqual(code, -signal.SIGKILL)
98        self.assertProcessGone(p)
100    def test_terminate(self):
101        p = self.spawn_psproc()
102        p.terminate()
103        code = p.wait()
104        if WINDOWS:
105            self.assertEqual(code, signal.SIGTERM)
106        else:
107            self.assertEqual(code, -signal.SIGTERM)
108        self.assertProcessGone(p)
110    def test_send_signal(self):
111        sig = signal.SIGKILL if POSIX else signal.SIGTERM
112        p = self.spawn_psproc()
113        p.send_signal(sig)
114        code = p.wait()
115        if WINDOWS:
116            self.assertEqual(code, sig)
117        else:
118            self.assertEqual(code, -sig)
119        self.assertProcessGone(p)
121    @unittest.skipIf(not POSIX, "not POSIX")
122    def test_send_signal_mocked(self):
123        sig = signal.SIGTERM
124        p = self.spawn_psproc()
125        with mock.patch('psutil.os.kill',
126                        side_effect=OSError(errno.ESRCH, "")):
127            self.assertRaises(psutil.NoSuchProcess, p.send_signal, sig)
129        p = self.spawn_psproc()
130        with mock.patch('psutil.os.kill',
131                        side_effect=OSError(errno.EPERM, "")):
132            self.assertRaises(psutil.AccessDenied, p.send_signal, sig)
134    def test_wait_exited(self):
135        # Test waitpid() + WIFEXITED -> WEXITSTATUS.
136        # normal return, same as exit(0)
137        cmd = [PYTHON_EXE, "-c", "pass"]
138        p = self.spawn_psproc(cmd)
139        code = p.wait()
140        self.assertEqual(code, 0)
141        self.assertProcessGone(p)
142        # exit(1), implicit in case of error
143        cmd = [PYTHON_EXE, "-c", "1 / 0"]
144        p = self.spawn_psproc(cmd, stderr=subprocess.PIPE)
145        code = p.wait()
146        self.assertEqual(code, 1)
147        self.assertProcessGone(p)
148        # via sys.exit()
149        cmd = [PYTHON_EXE, "-c", "import sys; sys.exit(5);"]
150        p = self.spawn_psproc(cmd)
151        code = p.wait()
152        self.assertEqual(code, 5)
153        self.assertProcessGone(p)
154        # via os._exit()
155        cmd = [PYTHON_EXE, "-c", "import os; os._exit(5);"]
156        p = self.spawn_psproc(cmd)
157        code = p.wait()
158        self.assertEqual(code, 5)
159        self.assertProcessGone(p)
161    def test_wait_stopped(self):
162        p = self.spawn_psproc()
163        if POSIX:
164            # Test waitpid() + WIFSTOPPED and WIFCONTINUED.
165            # Note: if a process is stopped it ignores SIGTERM.
166            p.send_signal(signal.SIGSTOP)
167            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
168            p.send_signal(signal.SIGCONT)
169            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
170            p.send_signal(signal.SIGTERM)
171            self.assertEqual(p.wait(), -signal.SIGTERM)
172            self.assertEqual(p.wait(), -signal.SIGTERM)
173        else:
174            p.suspend()
175            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
176            p.resume()
177            self.assertRaises(psutil.TimeoutExpired, p.wait, timeout=0.001)
178            p.terminate()
179            self.assertEqual(p.wait(), signal.SIGTERM)
180            self.assertEqual(p.wait(), signal.SIGTERM)
182    def test_wait_non_children(self):
183        # Test wait() against a process which is not our direct
184        # child.
185        child, grandchild = self.spawn_children_pair()
186        self.assertRaises(psutil.TimeoutExpired, child.wait, 0.01)
187        self.assertRaises(psutil.TimeoutExpired, grandchild.wait, 0.01)
188        # We also terminate the direct child otherwise the
189        # grandchild will hang until the parent is gone.
190        child.terminate()
191        grandchild.terminate()
192        child_ret = child.wait()
193        grandchild_ret = grandchild.wait()
194        if POSIX:
195            self.assertEqual(child_ret, -signal.SIGTERM)
196            # For processes which are not our children we're supposed
197            # to get None.
198            self.assertEqual(grandchild_ret, None)
199        else:
200            self.assertEqual(child_ret, signal.SIGTERM)
201            self.assertEqual(child_ret, signal.SIGTERM)
203    def test_wait_timeout(self):
204        p = self.spawn_psproc()
205        p.name()
206        self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
207        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
208        self.assertRaises(ValueError, p.wait, -1)
210    def test_wait_timeout_nonblocking(self):
211        p = self.spawn_psproc()
212        self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
213        p.kill()
214        stop_at = time.time() + GLOBAL_TIMEOUT
215        while time.time() < stop_at:
216            try:
217                code = p.wait(0)
218                break
219            except psutil.TimeoutExpired:
220                pass
221        else:
222            raise self.fail('timeout')
223        if POSIX:
224            self.assertEqual(code, -signal.SIGKILL)
225        else:
226            self.assertEqual(code, signal.SIGTERM)
227        self.assertProcessGone(p)
229    def test_cpu_percent(self):
230        p = psutil.Process()
231        p.cpu_percent(interval=0.001)
232        p.cpu_percent(interval=0.001)
233        for x in range(100):
234            percent = p.cpu_percent(interval=None)
235            self.assertIsInstance(percent, float)
236            self.assertGreaterEqual(percent, 0.0)
237        with self.assertRaises(ValueError):
238            p.cpu_percent(interval=-1)
240    def test_cpu_percent_numcpus_none(self):
241        # See: https://github.com/giampaolo/psutil/issues/1087
242        with mock.patch('psutil.cpu_count', return_value=None) as m:
243            psutil.Process().cpu_percent()
244            assert m.called
246    def test_cpu_times(self):
247        times = psutil.Process().cpu_times()
248        assert (times.user > 0.0) or (times.system > 0.0), times
249        assert (times.children_user >= 0.0), times
250        assert (times.children_system >= 0.0), times
251        if LINUX:
252            assert times.iowait >= 0.0, times
253        # make sure returned values can be pretty printed with strftime
254        for name in times._fields:
255            time.strftime("%H:%M:%S", time.localtime(getattr(times, name)))
257    def test_cpu_times_2(self):
258        user_time, kernel_time = psutil.Process().cpu_times()[:2]
259        utime, ktime = os.times()[:2]
261        # Use os.times()[:2] as base values to compare our results
262        # using a tolerance  of +/- 0.1 seconds.
263        # It will fail if the difference between the values is > 0.1s.
264        if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
265            self.fail("expected: %s, found: %s" % (utime, user_time))
267        if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
268            self.fail("expected: %s, found: %s" % (ktime, kernel_time))
270    @unittest.skipIf(not HAS_PROC_CPU_NUM, "not supported")
271    def test_cpu_num(self):
272        p = psutil.Process()
273        num = p.cpu_num()
274        self.assertGreaterEqual(num, 0)
275        if psutil.cpu_count() == 1:
276            self.assertEqual(num, 0)
277        self.assertIn(p.cpu_num(), range(psutil.cpu_count()))
279    def test_create_time(self):
280        p = self.spawn_psproc()
281        now = time.time()
282        create_time = p.create_time()
284        # Use time.time() as base value to compare our result using a
285        # tolerance of +/- 1 second.
286        # It will fail if the difference between the values is > 2s.
287        difference = abs(create_time - now)
288        if difference > 2:
289            self.fail("expected: %s, found: %s, difference: %s"
290                      % (now, create_time, difference))
292        # make sure returned value can be pretty printed with strftime
293        time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
295    @unittest.skipIf(not POSIX, 'POSIX only')
296    def test_terminal(self):
297        terminal = psutil.Process().terminal()
298        if terminal is not None:
299            tty = os.path.realpath(sh('tty'))
300            self.assertEqual(terminal, tty)
302    @unittest.skipIf(not HAS_PROC_IO_COUNTERS, 'not supported')
303    @skip_on_not_implemented(only_if=LINUX)
304    def test_io_counters(self):
305        p = psutil.Process()
306        # test reads
307        io1 = p.io_counters()
308        with open(PYTHON_EXE, 'rb') as f:
309            f.read()
310        io2 = p.io_counters()
311        if not BSD and not AIX:
312            self.assertGreater(io2.read_count, io1.read_count)
313            self.assertEqual(io2.write_count, io1.write_count)
314            if LINUX:
315                self.assertGreater(io2.read_chars, io1.read_chars)
316                self.assertEqual(io2.write_chars, io1.write_chars)
317        else:
318            self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
319            self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
321        # test writes
322        io1 = p.io_counters()
323        with open(self.get_testfn(), 'wb') as f:
324            if PY3:
325                f.write(bytes("x" * 1000000, 'ascii'))
326            else:
327                f.write("x" * 1000000)
328        io2 = p.io_counters()
329        self.assertGreaterEqual(io2.write_count, io1.write_count)
330        self.assertGreaterEqual(io2.write_bytes, io1.write_bytes)
331        self.assertGreaterEqual(io2.read_count, io1.read_count)
332        self.assertGreaterEqual(io2.read_bytes, io1.read_bytes)
333        if LINUX:
334            self.assertGreater(io2.write_chars, io1.write_chars)
335            self.assertGreaterEqual(io2.read_chars, io1.read_chars)
337        # sanity check
338        for i in range(len(io2)):
339            if BSD and i >= 2:
340                # On BSD read_bytes and write_bytes are always set to -1.
341                continue
342            self.assertGreaterEqual(io2[i], 0)
343            self.assertGreaterEqual(io2[i], 0)
345    @unittest.skipIf(not HAS_IONICE, "not supported")
346    @unittest.skipIf(not LINUX, "linux only")
347    def test_ionice_linux(self):
348        p = psutil.Process()
349        if not CI_TESTING:
350            self.assertEqual(p.ionice()[0], psutil.IOPRIO_CLASS_NONE)
351        self.assertEqual(psutil.IOPRIO_CLASS_NONE, 0)
352        self.assertEqual(psutil.IOPRIO_CLASS_RT, 1)  # high
353        self.assertEqual(psutil.IOPRIO_CLASS_BE, 2)  # normal
354        self.assertEqual(psutil.IOPRIO_CLASS_IDLE, 3)  # low
355        init = p.ionice()
356        try:
357            # low
358            p.ionice(psutil.IOPRIO_CLASS_IDLE)
359            self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_IDLE, 0))
360            with self.assertRaises(ValueError):  # accepts no value
361                p.ionice(psutil.IOPRIO_CLASS_IDLE, value=7)
362            # normal
363            p.ionice(psutil.IOPRIO_CLASS_BE)
364            self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 0))
365            p.ionice(psutil.IOPRIO_CLASS_BE, value=7)
366            self.assertEqual(tuple(p.ionice()), (psutil.IOPRIO_CLASS_BE, 7))
367            with self.assertRaises(ValueError):
368                p.ionice(psutil.IOPRIO_CLASS_BE, value=8)
369            try:
370                p.ionice(psutil.IOPRIO_CLASS_RT, value=7)
371            except psutil.AccessDenied:
372                pass
373            # errs
374            self.assertRaisesRegex(
375                ValueError, "ioclass accepts no value",
376                p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
377            self.assertRaisesRegex(
378                ValueError, "ioclass accepts no value",
379                p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
380            self.assertRaisesRegex(
381                ValueError, "'ioclass' argument must be specified",
382                p.ionice, value=1)
383        finally:
384            ioclass, value = init
385            if ioclass == psutil.IOPRIO_CLASS_NONE:
386                value = 0
387            p.ionice(ioclass, value)
389    @unittest.skipIf(not HAS_IONICE, "not supported")
390    @unittest.skipIf(not WINDOWS, 'not supported on this win version')
391    def test_ionice_win(self):
392        p = psutil.Process()
393        if not CI_TESTING:
394            self.assertEqual(p.ionice(), psutil.IOPRIO_NORMAL)
395        init = p.ionice()
396        try:
397            # base
398            p.ionice(psutil.IOPRIO_VERYLOW)
399            self.assertEqual(p.ionice(), psutil.IOPRIO_VERYLOW)
400            p.ionice(psutil.IOPRIO_LOW)
401            self.assertEqual(p.ionice(), psutil.IOPRIO_LOW)
402            try:
403                p.ionice(psutil.IOPRIO_HIGH)
404            except psutil.AccessDenied:
405                pass
406            else:
407                self.assertEqual(p.ionice(), psutil.IOPRIO_HIGH)
408            # errs
409            self.assertRaisesRegex(
410                TypeError, "value argument not accepted on Windows",
411                p.ionice, psutil.IOPRIO_NORMAL, value=1)
412            self.assertRaisesRegex(
413                ValueError, "is not a valid priority",
414                p.ionice, psutil.IOPRIO_HIGH + 1)
415        finally:
416            p.ionice(init)
418    @unittest.skipIf(not HAS_RLIMIT, "not supported")
419    def test_rlimit_get(self):
420        import resource
421        p = psutil.Process(os.getpid())
422        names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
423        assert names, names
424        for name in names:
425            value = getattr(psutil, name)
426            self.assertGreaterEqual(value, 0)
427            if name in dir(resource):
428                self.assertEqual(value, getattr(resource, name))
429                # XXX - On PyPy RLIMIT_INFINITY returned by
430                # resource.getrlimit() is reported as a very big long
431                # number instead of -1. It looks like a bug with PyPy.
432                if PYPY:
433                    continue
434                self.assertEqual(p.rlimit(value), resource.getrlimit(value))
435            else:
436                ret = p.rlimit(value)
437                self.assertEqual(len(ret), 2)
438                self.assertGreaterEqual(ret[0], -1)
439                self.assertGreaterEqual(ret[1], -1)
441    @unittest.skipIf(not HAS_RLIMIT, "not supported")
442    def test_rlimit_set(self):
443        p = self.spawn_psproc()
444        p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
445        self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
446        # If pid is 0 prlimit() applies to the calling process and
447        # we don't want that.
448        if LINUX:
449            with self.assertRaisesRegex(ValueError, "can't use prlimit"):
450                psutil._psplatform.Process(0).rlimit(0)
451        with self.assertRaises(ValueError):
452            p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
454    @unittest.skipIf(not HAS_RLIMIT, "not supported")
455    def test_rlimit(self):
456        p = psutil.Process()
457        testfn = self.get_testfn()
458        soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
459        try:
460            p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
461            with open(testfn, "wb") as f:
462                f.write(b"X" * 1024)
463            # write() or flush() doesn't always cause the exception
464            # but close() will.
465            with self.assertRaises(IOError) as exc:
466                with open(testfn, "wb") as f:
467                    f.write(b"X" * 1025)
468            self.assertEqual(exc.exception.errno if PY3 else exc.exception[0],
469                             errno.EFBIG)
470        finally:
471            p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
472            self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
474    @unittest.skipIf(not HAS_RLIMIT, "not supported")
475    def test_rlimit_infinity(self):
476        # First set a limit, then re-set it by specifying INFINITY
477        # and assume we overridden the previous limit.
478        p = psutil.Process()
479        soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
480        try:
481            p.rlimit(psutil.RLIMIT_FSIZE, (1024, hard))
482            p.rlimit(psutil.RLIMIT_FSIZE, (psutil.RLIM_INFINITY, hard))
483            with open(self.get_testfn(), "wb") as f:
484                f.write(b"X" * 2048)
485        finally:
486            p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
487            self.assertEqual(p.rlimit(psutil.RLIMIT_FSIZE), (soft, hard))
489    @unittest.skipIf(not HAS_RLIMIT, "not supported")
490    def test_rlimit_infinity_value(self):
491        # RLIMIT_FSIZE should be RLIM_INFINITY, which will be a really
492        # big number on a platform with large file support.  On these
493        # platforms we need to test that the get/setrlimit functions
494        # properly convert the number to a C long long and that the
495        # conversion doesn't raise an error.
496        p = psutil.Process()
497        soft, hard = p.rlimit(psutil.RLIMIT_FSIZE)
498        self.assertEqual(psutil.RLIM_INFINITY, hard)
499        p.rlimit(psutil.RLIMIT_FSIZE, (soft, hard))
501    def test_num_threads(self):
502        # on certain platforms such as Linux we might test for exact
503        # thread number, since we always have with 1 thread per process,
504        # but this does not apply across all platforms (MACOS, Windows)
505        p = psutil.Process()
506        if OPENBSD:
507            try:
508                step1 = p.num_threads()
509            except psutil.AccessDenied:
510                raise unittest.SkipTest("on OpenBSD this requires root access")
511        else:
512            step1 = p.num_threads()
514        with ThreadTask():
515            step2 = p.num_threads()
516            self.assertEqual(step2, step1 + 1)
518    @unittest.skipIf(not WINDOWS, 'WINDOWS only')
519    def test_num_handles(self):
520        # a better test is done later into test/_windows.py
521        p = psutil.Process()
522        self.assertGreater(p.num_handles(), 0)
524    @unittest.skipIf(not HAS_THREADS, 'not supported')
525    def test_threads(self):
526        p = psutil.Process()
527        if OPENBSD:
528            try:
529                step1 = p.threads()
530            except psutil.AccessDenied:
531                raise unittest.SkipTest("on OpenBSD this requires root access")
532        else:
533            step1 = p.threads()
535        with ThreadTask():
536            step2 = p.threads()
537            self.assertEqual(len(step2), len(step1) + 1)
538            athread = step2[0]
539            # test named tuple
540            self.assertEqual(athread.id, athread[0])
541            self.assertEqual(athread.user_time, athread[1])
542            self.assertEqual(athread.system_time, athread[2])
544    @retry_on_failure()
545    @skip_on_access_denied(only_if=MACOS)
546    @unittest.skipIf(not HAS_THREADS, 'not supported')
547    def test_threads_2(self):
548        p = self.spawn_psproc()
549        if OPENBSD:
550            try:
551                p.threads()
552            except psutil.AccessDenied:
553                raise unittest.SkipTest(
554                    "on OpenBSD this requires root access")
555        self.assertAlmostEqual(
556            p.cpu_times().user,
557            sum([x.user_time for x in p.threads()]), delta=0.1)
558        self.assertAlmostEqual(
559            p.cpu_times().system,
560            sum([x.system_time for x in p.threads()]), delta=0.1)
562    @retry_on_failure()
563    def test_memory_info(self):
564        p = psutil.Process()
566        # step 1 - get a base value to compare our results
567        rss1, vms1 = p.memory_info()[:2]
568        percent1 = p.memory_percent()
569        self.assertGreater(rss1, 0)
570        self.assertGreater(vms1, 0)
572        # step 2 - allocate some memory
573        memarr = [None] * 1500000
575        rss2, vms2 = p.memory_info()[:2]
576        percent2 = p.memory_percent()
578        # step 3 - make sure that the memory usage bumped up
579        self.assertGreater(rss2, rss1)
580        self.assertGreaterEqual(vms2, vms1)  # vms might be equal
581        self.assertGreater(percent2, percent1)
582        del memarr
584        if WINDOWS:
585            mem = p.memory_info()
586            self.assertEqual(mem.rss, mem.wset)
587            self.assertEqual(mem.vms, mem.pagefile)
589        mem = p.memory_info()
590        for name in mem._fields:
591            self.assertGreaterEqual(getattr(mem, name), 0)
593    def test_memory_full_info(self):
594        p = psutil.Process()
595        total = psutil.virtual_memory().total
596        mem = p.memory_full_info()
597        for name in mem._fields:
598            value = getattr(mem, name)
599            self.assertGreaterEqual(value, 0, msg=(name, value))
600            if name == 'vms' and OSX or LINUX:
601                continue
602            self.assertLessEqual(value, total, msg=(name, value, total))
603        if LINUX or WINDOWS or MACOS:
604            self.assertGreaterEqual(mem.uss, 0)
605        if LINUX:
606            self.assertGreaterEqual(mem.pss, 0)
607            self.assertGreaterEqual(mem.swap, 0)
609    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
610    def test_memory_maps(self):
611        p = psutil.Process()
612        maps = p.memory_maps()
613        paths = [x for x in maps]
614        self.assertEqual(len(paths), len(set(paths)))
615        ext_maps = p.memory_maps(grouped=False)
617        for nt in maps:
618            if not nt.path.startswith('['):
619                assert os.path.isabs(nt.path), nt.path
620                if POSIX:
621                    try:
622                        assert os.path.exists(nt.path) or \
623                            os.path.islink(nt.path), nt.path
624                    except AssertionError:
625                        if not LINUX:
626                            raise
627                        else:
628                            # https://github.com/giampaolo/psutil/issues/759
629                            with open_text('/proc/self/smaps') as f:
630                                data = f.read()
631                            if "%s (deleted)" % nt.path not in data:
632                                raise
633                else:
634                    # XXX - On Windows we have this strange behavior with
635                    # 64 bit dlls: they are visible via explorer but cannot
636                    # be accessed via os.stat() (wtf?).
637                    if '64' not in os.path.basename(nt.path):
638                        try:
639                            st = os.stat(nt.path)
640                        except FileNotFoundError:
641                            pass
642                        else:
643                            assert stat.S_ISREG(st.st_mode), nt.path
644        for nt in ext_maps:
645            for fname in nt._fields:
646                value = getattr(nt, fname)
647                if fname == 'path':
648                    continue
649                elif fname in ('addr', 'perms'):
650                    assert value, value
651                else:
652                    self.assertIsInstance(value, (int, long))
653                    assert value >= 0, value
655    @unittest.skipIf(not HAS_MEMORY_MAPS, "not supported")
656    def test_memory_maps_lists_lib(self):
657        # Make sure a newly loaded shared lib is listed.
658        p = psutil.Process()
659        with copyload_shared_lib() as path:
660            def normpath(p):
661                return os.path.realpath(os.path.normcase(p))
662            libpaths = [normpath(x.path)
663                        for x in p.memory_maps()]
664            self.assertIn(normpath(path), libpaths)
666    def test_memory_percent(self):
667        p = psutil.Process()
668        p.memory_percent()
669        self.assertRaises(ValueError, p.memory_percent, memtype="?!?")
670        if LINUX or MACOS or WINDOWS:
671            p.memory_percent(memtype='uss')
673    def test_is_running(self):
674        p = self.spawn_psproc()
675        assert p.is_running()
676        assert p.is_running()
677        p.kill()
678        p.wait()
679        assert not p.is_running()
680        assert not p.is_running()
682    def test_exe(self):
683        p = self.spawn_psproc()
684        exe = p.exe()
685        try:
686            self.assertEqual(exe, PYTHON_EXE)
687        except AssertionError:
688            if WINDOWS and len(exe) == len(PYTHON_EXE):
689                # on Windows we don't care about case sensitivity
690                normcase = os.path.normcase
691                self.assertEqual(normcase(exe), normcase(PYTHON_EXE))
692            else:
693                # certain platforms such as BSD are more accurate returning:
694                # "/usr/local/bin/python2.7"
695                # ...instead of:
696                # "/usr/local/bin/python"
697                # We do not want to consider this difference in accuracy
698                # an error.
699                ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
700                try:
701                    self.assertEqual(exe.replace(ver, ''),
702                                     PYTHON_EXE.replace(ver, ''))
703                except AssertionError:
704                    # Tipically MACOS. Really not sure what to do here.
705                    pass
707        out = sh([exe, "-c", "import os; print('hey')"])
708        self.assertEqual(out, 'hey')
710    def test_cmdline(self):
711        cmdline = [PYTHON_EXE, "-c", "import time; time.sleep(60)"]
712        p = self.spawn_psproc(cmdline)
713        # XXX - most of the times the underlying sysctl() call on Net
714        # and Open BSD returns a truncated string.
715        # Also /proc/pid/cmdline behaves the same so it looks
716        # like this is a kernel bug.
717        # XXX - AIX truncates long arguments in /proc/pid/cmdline
718        if NETBSD or OPENBSD or AIX:
719            self.assertEqual(p.cmdline()[0], PYTHON_EXE)
720        else:
721            self.assertEqual(' '.join(p.cmdline()), ' '.join(cmdline))
723    @unittest.skipIf(PYPY, "broken on PYPY")
724    def test_long_cmdline(self):
725        testfn = self.get_testfn()
726        create_exe(testfn)
727        cmdline = [testfn] + (["0123456789"] * 20)
728        p = self.spawn_psproc(cmdline)
729        self.assertEqual(p.cmdline(), cmdline)
731    def test_name(self):
732        p = self.spawn_psproc(PYTHON_EXE)
733        name = p.name().lower()
734        pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
735        assert pyexe.startswith(name), (pyexe, name)
737    @unittest.skipIf(PYPY, "unreliable on PYPY")
738    def test_long_name(self):
739        testfn = self.get_testfn(suffix="0123456789" * 2)
740        create_exe(testfn)
741        p = self.spawn_psproc(testfn)
742        self.assertEqual(p.name(), os.path.basename(testfn))
744    # XXX
745    @unittest.skipIf(SUNOS, "broken on SUNOS")
746    @unittest.skipIf(AIX, "broken on AIX")
747    @unittest.skipIf(PYPY, "broken on PYPY")
748    def test_prog_w_funky_name(self):
749        # Test that name(), exe() and cmdline() correctly handle programs
750        # with funky chars such as spaces and ")", see:
751        # https://github.com/giampaolo/psutil/issues/628
752        funky_path = self.get_testfn(suffix='foo bar )')
753        create_exe(funky_path)
754        cmdline = [funky_path, "-c",
755                   "import time; [time.sleep(0.01) for x in range(3000)];"
756                   "arg1", "arg2", "", "arg3", ""]
757        p = self.spawn_psproc(cmdline)
758        self.assertEqual(p.cmdline(), cmdline)
759        self.assertEqual(p.name(), os.path.basename(funky_path))
760        self.assertEqual(os.path.normcase(p.exe()),
761                         os.path.normcase(funky_path))
763    @unittest.skipIf(not POSIX, 'POSIX only')
764    def test_uids(self):
765        p = psutil.Process()
766        real, effective, saved = p.uids()
767        # os.getuid() refers to "real" uid
768        self.assertEqual(real, os.getuid())
769        # os.geteuid() refers to "effective" uid
770        self.assertEqual(effective, os.geteuid())
771        # No such thing as os.getsuid() ("saved" uid), but starting
772        # from python 2.7 we have os.getresuid() which returns all
773        # of them.
774        if hasattr(os, "getresuid"):
775            self.assertEqual(os.getresuid(), p.uids())
777    @unittest.skipIf(not POSIX, 'POSIX only')
778    def test_gids(self):
779        p = psutil.Process()
780        real, effective, saved = p.gids()
781        # os.getuid() refers to "real" uid
782        self.assertEqual(real, os.getgid())
783        # os.geteuid() refers to "effective" uid
784        self.assertEqual(effective, os.getegid())
785        # No such thing as os.getsgid() ("saved" gid), but starting
786        # from python 2.7 we have os.getresgid() which returns all
787        # of them.
788        if hasattr(os, "getresuid"):
789            self.assertEqual(os.getresgid(), p.gids())
791    def test_nice(self):
792        p = psutil.Process()
793        self.assertRaises(TypeError, p.nice, "str")
794        init = p.nice()
795        try:
796            if WINDOWS:
797                for prio in [psutil.NORMAL_PRIORITY_CLASS,
798                             psutil.IDLE_PRIORITY_CLASS,
799                             psutil.BELOW_NORMAL_PRIORITY_CLASS,
800                             psutil.REALTIME_PRIORITY_CLASS,
801                             psutil.HIGH_PRIORITY_CLASS,
802                             psutil.ABOVE_NORMAL_PRIORITY_CLASS]:
803                    with self.subTest(prio=prio):
804                        try:
805                            p.nice(prio)
806                        except psutil.AccessDenied:
807                            pass
808                        else:
809                            self.assertEqual(p.nice(), prio)
810            else:
811                try:
812                    if hasattr(os, "getpriority"):
813                        self.assertEqual(
814                            os.getpriority(os.PRIO_PROCESS, os.getpid()),
815                            p.nice())
816                    p.nice(1)
817                    self.assertEqual(p.nice(), 1)
818                    if hasattr(os, "getpriority"):
819                        self.assertEqual(
820                            os.getpriority(os.PRIO_PROCESS, os.getpid()),
821                            p.nice())
822                    # XXX - going back to previous nice value raises
823                    # AccessDenied on MACOS
824                    if not MACOS:
825                        p.nice(0)
826                        self.assertEqual(p.nice(), 0)
827                except psutil.AccessDenied:
828                    pass
829        finally:
830            try:
831                p.nice(init)
832            except psutil.AccessDenied:
833                pass
835    def test_status(self):
836        p = psutil.Process()
837        self.assertEqual(p.status(), psutil.STATUS_RUNNING)
839    def test_username(self):
840        p = self.spawn_psproc()
841        username = p.username()
842        if WINDOWS:
843            domain, username = username.split('\\')
844            self.assertEqual(username, getpass.getuser())
845            if 'USERDOMAIN' in os.environ:
846                self.assertEqual(domain, os.environ['USERDOMAIN'])
847        else:
848            self.assertEqual(username, getpass.getuser())
850    def test_cwd(self):
851        p = self.spawn_psproc()
852        self.assertEqual(p.cwd(), os.getcwd())
854    def test_cwd_2(self):
855        cmd = [PYTHON_EXE, "-c",
856               "import os, time; os.chdir('..'); time.sleep(60)"]
857        p = self.spawn_psproc(cmd)
858        call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
860    @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
861    def test_cpu_affinity(self):
862        p = psutil.Process()
863        initial = p.cpu_affinity()
864        assert initial, initial
865        self.addCleanup(p.cpu_affinity, initial)
867        if hasattr(os, "sched_getaffinity"):
868            self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
869        self.assertEqual(len(initial), len(set(initial)))
871        all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
872        for n in all_cpus:
873            p.cpu_affinity([n])
874            self.assertEqual(p.cpu_affinity(), [n])
875            if hasattr(os, "sched_getaffinity"):
876                self.assertEqual(p.cpu_affinity(),
877                                 list(os.sched_getaffinity(p.pid)))
878            # also test num_cpu()
879            if hasattr(p, "num_cpu"):
880                self.assertEqual(p.cpu_affinity()[0], p.num_cpu())
882        # [] is an alias for "all eligible CPUs"; on Linux this may
883        # not be equal to all available CPUs, see:
884        # https://github.com/giampaolo/psutil/issues/956
885        p.cpu_affinity([])
886        if LINUX:
887            self.assertEqual(p.cpu_affinity(), p._proc._get_eligible_cpus())
888        else:
889            self.assertEqual(p.cpu_affinity(), all_cpus)
890        if hasattr(os, "sched_getaffinity"):
891            self.assertEqual(p.cpu_affinity(),
892                             list(os.sched_getaffinity(p.pid)))
893        #
894        self.assertRaises(TypeError, p.cpu_affinity, 1)
895        p.cpu_affinity(initial)
896        # it should work with all iterables, not only lists
897        p.cpu_affinity(set(all_cpus))
898        p.cpu_affinity(tuple(all_cpus))
900    @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
901    def test_cpu_affinity_errs(self):
902        p = self.spawn_psproc()
903        invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
904        self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
905        self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
906        self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
907        self.assertRaises(ValueError, p.cpu_affinity, [0, -1])
909    @unittest.skipIf(not HAS_CPU_AFFINITY, 'not supported')
910    def test_cpu_affinity_all_combinations(self):
911        p = psutil.Process()
912        initial = p.cpu_affinity()
913        assert initial, initial
914        self.addCleanup(p.cpu_affinity, initial)
916        # All possible CPU set combinations.
917        if len(initial) > 12:
918            initial = initial[:12]  # ...otherwise it will take forever
919        combos = []
920        for i in range(0, len(initial) + 1):
921            for subset in itertools.combinations(initial, i):
922                if subset:
923                    combos.append(list(subset))
925        for combo in combos:
926            p.cpu_affinity(combo)
927            self.assertEqual(sorted(p.cpu_affinity()), sorted(combo))
929    # TODO: #595
930    @unittest.skipIf(BSD, "broken on BSD")
931    # can't find any process file on Appveyor
932    @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
933    def test_open_files(self):
934        p = psutil.Process()
935        testfn = self.get_testfn()
936        files = p.open_files()
937        self.assertNotIn(testfn, files)
938        with open(testfn, 'wb') as f:
939            f.write(b'x' * 1024)
940            f.flush()
941            # give the kernel some time to see the new file
942            files = call_until(p.open_files, "len(ret) != %i" % len(files))
943            filenames = [os.path.normcase(x.path) for x in files]
944            self.assertIn(os.path.normcase(testfn), filenames)
945            if LINUX:
946                for file in files:
947                    if file.path == testfn:
948                        self.assertEqual(file.position, 1024)
949        for file in files:
950            assert os.path.isfile(file.path), file
952        # another process
953        cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % testfn
954        p = self.spawn_psproc([PYTHON_EXE, "-c", cmdline])
956        for x in range(100):
957            filenames = [os.path.normcase(x.path) for x in p.open_files()]
958            if testfn in filenames:
959                break
960            time.sleep(.01)
961        else:
962            self.assertIn(os.path.normcase(testfn), filenames)
963        for file in filenames:
964            assert os.path.isfile(file), file
966    # TODO: #595
967    @unittest.skipIf(BSD, "broken on BSD")
968    # can't find any process file on Appveyor
969    @unittest.skipIf(APPVEYOR, "unreliable on APPVEYOR")
970    def test_open_files_2(self):
971        # test fd and path fields
972        p = psutil.Process()
973        normcase = os.path.normcase
974        testfn = self.get_testfn()
975        with open(testfn, 'w') as fileobj:
976            for file in p.open_files():
977                if normcase(file.path) == normcase(fileobj.name) or \
978                        file.fd == fileobj.fileno():
979                    break
980            else:
981                self.fail("no file found; files=%s" % repr(p.open_files()))
982            self.assertEqual(normcase(file.path), normcase(fileobj.name))
983            if WINDOWS:
984                self.assertEqual(file.fd, -1)
985            else:
986                self.assertEqual(file.fd, fileobj.fileno())
987            # test positions
988            ntuple = p.open_files()[0]
989            self.assertEqual(ntuple[0], ntuple.path)
990            self.assertEqual(ntuple[1], ntuple.fd)
991            # test file is gone
992            self.assertNotIn(fileobj.name, p.open_files())
994    @unittest.skipIf(not POSIX, 'POSIX only')
995    def test_num_fds(self):
996        p = psutil.Process()
997        testfn = self.get_testfn()
998        start = p.num_fds()
999        file = open(testfn, 'w')
1000        self.addCleanup(file.close)
1001        self.assertEqual(p.num_fds(), start + 1)
1002        sock = socket.socket()
1003        self.addCleanup(sock.close)
1004        self.assertEqual(p.num_fds(), start + 2)
1005        file.close()
1006        sock.close()
1007        self.assertEqual(p.num_fds(), start)
1009    @skip_on_not_implemented(only_if=LINUX)
1010    @unittest.skipIf(OPENBSD or NETBSD, "not reliable on OPENBSD & NETBSD")
1011    def test_num_ctx_switches(self):
1012        p = psutil.Process()
1013        before = sum(p.num_ctx_switches())
1014        for x in range(500000):
1015            after = sum(p.num_ctx_switches())
1016            if after > before:
1017                return
1018        self.fail("num ctx switches still the same after 50.000 iterations")
1020    def test_ppid(self):
1021        p = psutil.Process()
1022        if hasattr(os, 'getppid'):
1023            self.assertEqual(p.ppid(), os.getppid())
1024        p = self.spawn_psproc()
1025        self.assertEqual(p.ppid(), os.getpid())
1026        if APPVEYOR:
1027            # Occasional failures, see:
1028            # https://ci.appveyor.com/project/giampaolo/psutil/build/
1029            #     job/0hs623nenj7w4m33
1030            return
1032    def test_parent(self):
1033        p = self.spawn_psproc()
1034        self.assertEqual(p.parent().pid, os.getpid())
1036        lowest_pid = psutil.pids()[0]
1037        self.assertIsNone(psutil.Process(lowest_pid).parent())
1039    def test_parent_multi(self):
1040        parent = psutil.Process()
1041        child, grandchild = self.spawn_children_pair()
1042        self.assertEqual(grandchild.parent(), child)
1043        self.assertEqual(child.parent(), parent)
1045    def test_parent_disappeared(self):
1046        # Emulate a case where the parent process disappeared.
1047        p = self.spawn_psproc()
1048        with mock.patch("psutil.Process",
1049                        side_effect=psutil.NoSuchProcess(0, 'foo')):
1050            self.assertIsNone(p.parent())
1052    @retry_on_failure()
1053    def test_parents(self):
1054        parent = psutil.Process()
1055        assert parent.parents()
1056        child, grandchild = self.spawn_children_pair()
1057        self.assertEqual(child.parents()[0], parent)
1058        self.assertEqual(grandchild.parents()[0], child)
1059        self.assertEqual(grandchild.parents()[1], parent)
1061    def test_children(self):
1062        parent = psutil.Process()
1063        self.assertEqual(parent.children(), [])
1064        self.assertEqual(parent.children(recursive=True), [])
1065        # On Windows we set the flag to 0 in order to cancel out the
1066        # CREATE_NO_WINDOW flag (enabled by default) which creates
1067        # an extra "conhost.exe" child.
1068        child = self.spawn_psproc(creationflags=0)
1069        children1 = parent.children()
1070        children2 = parent.children(recursive=True)
1071        for children in (children1, children2):
1072            self.assertEqual(len(children), 1)
1073            self.assertEqual(children[0].pid, child.pid)
1074            self.assertEqual(children[0].ppid(), parent.pid)
1076    def test_children_recursive(self):
1077        # Test children() against two sub processes, p1 and p2, where
1078        # p1 (our child) spawned p2 (our grandchild).
1079        parent = psutil.Process()
1080        child, grandchild = self.spawn_children_pair()
1081        self.assertEqual(parent.children(), [child])
1082        self.assertEqual(parent.children(recursive=True), [child, grandchild])
1083        # If the intermediate process is gone there's no way for
1084        # children() to recursively find it.
1085        child.terminate()
1086        child.wait()
1087        self.assertEqual(parent.children(recursive=True), [])
1089    def test_children_duplicates(self):
1090        # find the process which has the highest number of children
1091        table = collections.defaultdict(int)
1092        for p in psutil.process_iter():
1093            try:
1094                table[p.ppid()] += 1
1095            except psutil.Error:
1096                pass
1097        # this is the one, now let's make sure there are no duplicates
1098        pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
1099        if LINUX and pid == 0:
1100            raise self.skipTest("PID 0")
1101        p = psutil.Process(pid)
1102        try:
1103            c = p.children(recursive=True)
1104        except psutil.AccessDenied:  # windows
1105            pass
1106        else:
1107            self.assertEqual(len(c), len(set(c)))
1109    def test_parents_and_children(self):
1110        parent = psutil.Process()
1111        child, grandchild = self.spawn_children_pair()
1112        # forward
1113        children = parent.children(recursive=True)
1114        self.assertEqual(len(children), 2)
1115        self.assertEqual(children[0], child)
1116        self.assertEqual(children[1], grandchild)
1117        # backward
1118        parents = grandchild.parents()
1119        self.assertEqual(parents[0], child)
1120        self.assertEqual(parents[1], parent)
1122    def test_suspend_resume(self):
1123        p = self.spawn_psproc()
1124        p.suspend()
1125        for x in range(100):
1126            if p.status() == psutil.STATUS_STOPPED:
1127                break
1128            time.sleep(0.01)
1129        p.resume()
1130        self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
1132    def test_invalid_pid(self):
1133        self.assertRaises(TypeError, psutil.Process, "1")
1134        self.assertRaises(ValueError, psutil.Process, -1)
1136    def test_as_dict(self):
1137        p = psutil.Process()
1138        d = p.as_dict(attrs=['exe', 'name'])
1139        self.assertEqual(sorted(d.keys()), ['exe', 'name'])
1141        p = psutil.Process(min(psutil.pids()))
1142        d = p.as_dict(attrs=['connections'], ad_value='foo')
1143        if not isinstance(d['connections'], list):
1144            self.assertEqual(d['connections'], 'foo')
1146        # Test ad_value is set on AccessDenied.
1147        with mock.patch('psutil.Process.nice', create=True,
1148                        side_effect=psutil.AccessDenied):
1149            self.assertEqual(
1150                p.as_dict(attrs=["nice"], ad_value=1), {"nice": 1})
1152        # Test that NoSuchProcess bubbles up.
1153        with mock.patch('psutil.Process.nice', create=True,
1154                        side_effect=psutil.NoSuchProcess(p.pid, "name")):
1155            self.assertRaises(
1156                psutil.NoSuchProcess, p.as_dict, attrs=["nice"])
1158        # Test that ZombieProcess is swallowed.
1159        with mock.patch('psutil.Process.nice', create=True,
1160                        side_effect=psutil.ZombieProcess(p.pid, "name")):
1161            self.assertEqual(
1162                p.as_dict(attrs=["nice"], ad_value="foo"), {"nice": "foo"})
1164        # By default APIs raising NotImplementedError are
1165        # supposed to be skipped.
1166        with mock.patch('psutil.Process.nice', create=True,
1167                        side_effect=NotImplementedError):
1168            d = p.as_dict()
1169            self.assertNotIn('nice', list(d.keys()))
1170            # ...unless the user explicitly asked for some attr.
1171            with self.assertRaises(NotImplementedError):
1172                p.as_dict(attrs=["nice"])
1174        # errors
1175        with self.assertRaises(TypeError):
1176            p.as_dict('name')
1177        with self.assertRaises(ValueError):
1178            p.as_dict(['foo'])
1179        with self.assertRaises(ValueError):
1180            p.as_dict(['foo', 'bar'])
1182    def test_oneshot(self):
1183        p = psutil.Process()
1184        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1185            with p.oneshot():
1186                p.cpu_times()
1187                p.cpu_times()
1188            self.assertEqual(m.call_count, 1)
1190        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1191            p.cpu_times()
1192            p.cpu_times()
1193        self.assertEqual(m.call_count, 2)
1195    def test_oneshot_twice(self):
1196        # Test the case where the ctx manager is __enter__ed twice.
1197        # The second __enter__ is supposed to resut in a NOOP.
1198        p = psutil.Process()
1199        with mock.patch("psutil._psplatform.Process.cpu_times") as m1:
1200            with mock.patch("psutil._psplatform.Process.oneshot_enter") as m2:
1201                with p.oneshot():
1202                    p.cpu_times()
1203                    p.cpu_times()
1204                    with p.oneshot():
1205                        p.cpu_times()
1206                        p.cpu_times()
1207                self.assertEqual(m1.call_count, 1)
1208                self.assertEqual(m2.call_count, 1)
1210        with mock.patch("psutil._psplatform.Process.cpu_times") as m:
1211            p.cpu_times()
1212            p.cpu_times()
1213        self.assertEqual(m.call_count, 2)
1215    def test_oneshot_cache(self):
1216        # Make sure oneshot() cache is nonglobal. Instead it's
1217        # supposed to be bound to the Process instance, see:
1218        # https://github.com/giampaolo/psutil/issues/1373
1219        p1, p2 = self.spawn_children_pair()
1220        p1_ppid = p1.ppid()
1221        p2_ppid = p2.ppid()
1222        self.assertNotEqual(p1_ppid, p2_ppid)
1223        with p1.oneshot():
1224            self.assertEqual(p1.ppid(), p1_ppid)
1225            self.assertEqual(p2.ppid(), p2_ppid)
1226        with p2.oneshot():
1227            self.assertEqual(p1.ppid(), p1_ppid)
1228            self.assertEqual(p2.ppid(), p2_ppid)
1230    def test_halfway_terminated_process(self):
1231        # Test that NoSuchProcess exception gets raised in case the
1232        # process dies after we create the Process object.
1233        # Example:
1234        # >>> proc = Process(1234)
1235        # >>> time.sleep(2)  # time-consuming task, process dies in meantime
1236        # >>> proc.name()
1237        # Refers to Issue #15
1238        def assert_raises_nsp(fun, fun_name):
1239            try:
1240                ret = fun()
1241            except psutil.ZombieProcess:  # differentiate from NSP
1242                raise
1243            except psutil.NoSuchProcess:
1244                pass
1245            except psutil.AccessDenied:
1246                if OPENBSD and fun_name in ('threads', 'num_threads'):
1247                    return
1248                raise
1249            else:
1250                # NtQuerySystemInformation succeeds even if process is gone.
1251                if WINDOWS and fun_name in ('exe', 'name'):
1252                    return
1253                raise self.fail("%r didn't raise NSP and returned %r "
1254                                "instead" % (fun, ret))
1256        p = self.spawn_psproc()
1257        p.terminate()
1258        p.wait()
1259        if WINDOWS:  # XXX
1260            call_until(psutil.pids, "%s not in ret" % p.pid)
1261        self.assertProcessGone(p)
1263        ns = process_namespace(p)
1264        for fun, name in ns.iter(ns.all):
1265            assert_raises_nsp(fun, name)
1267        # NtQuerySystemInformation succeeds even if process is gone.
1268        if WINDOWS and not GITHUB_ACTIONS:
1269            normcase = os.path.normcase
1270            self.assertEqual(normcase(p.exe()), normcase(PYTHON_EXE))
1272    @unittest.skipIf(not POSIX, 'POSIX only')
1273    def test_zombie_process(self):
1274        def succeed_or_zombie_p_exc(fun):
1275            try:
1276                return fun()
1277            except (psutil.ZombieProcess, psutil.AccessDenied):
1278                pass
1280        parent, zombie = self.spawn_zombie()
1281        # A zombie process should always be instantiable
1282        zproc = psutil.Process(zombie.pid)
1283        # ...and at least its status always be querable
1284        self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
1285        # ...and it should be considered 'running'
1286        assert zproc.is_running()
1287        # ...and as_dict() shouldn't crash
1288        zproc.as_dict()
1289        # ...its parent should 'see' it (edit: not true on BSD and MACOS
1290        # descendants = [x.pid for x in psutil.Process().children(
1291        #                recursive=True)]
1292        # self.assertIn(zpid, descendants)
1293        # XXX should we also assume ppid be usable?  Note: this
1294        # would be an important use case as the only way to get
1295        # rid of a zombie is to kill its parent.
1296        # self.assertEqual(zpid.ppid(), os.getpid())
1297        # ...and all other APIs should be able to deal with it
1299        ns = process_namespace(zproc)
1300        for fun, name in ns.iter(ns.all):
1301            succeed_or_zombie_p_exc(fun)
1303        assert psutil.pid_exists(zproc.pid)
1304        self.assertIn(zproc.pid, psutil.pids())
1305        self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1306        psutil._pmap = {}
1307        self.assertIn(zproc.pid, [x.pid for x in psutil.process_iter()])
1309    @unittest.skipIf(not POSIX, 'POSIX only')
1310    def test_zombie_process_is_running_w_exc(self):
1311        # Emulate a case where internally is_running() raises
1312        # ZombieProcess.
1313        p = psutil.Process()
1314        with mock.patch("psutil.Process",
1315                        side_effect=psutil.ZombieProcess(0)) as m:
1316            assert p.is_running()
1317            assert m.called
1319    @unittest.skipIf(not POSIX, 'POSIX only')
1320    def test_zombie_process_status_w_exc(self):
1321        # Emulate a case where internally status() raises
1322        # ZombieProcess.
1323        p = psutil.Process()
1324        with mock.patch("psutil._psplatform.Process.status",
1325                        side_effect=psutil.ZombieProcess(0)) as m:
1326            self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1327            assert m.called
1329    def test_pid_0(self):
1330        # Process(0) is supposed to work on all platforms except Linux
1331        if 0 not in psutil.pids():
1332            self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
1333            # These 2 are a contradiction, but "ps" says PID 1's parent
1334            # is PID 0.
1335            assert not psutil.pid_exists(0)
1336            self.assertEqual(psutil.Process(1).ppid(), 0)
1337            return
1339        p = psutil.Process(0)
1340        exc = psutil.AccessDenied if WINDOWS else ValueError
1341        self.assertRaises(exc, p.wait)
1342        self.assertRaises(exc, p.terminate)
1343        self.assertRaises(exc, p.suspend)
1344        self.assertRaises(exc, p.resume)
1345        self.assertRaises(exc, p.kill)
1346        self.assertRaises(exc, p.send_signal, signal.SIGTERM)
1348        # test all methods
1349        ns = process_namespace(p)
1350        for fun, name in ns.iter(ns.getters + ns.setters):
1351            try:
1352                ret = fun()
1353            except psutil.AccessDenied:
1354                pass
1355            else:
1356                if name in ("uids", "gids"):
1357                    self.assertEqual(ret.real, 0)
1358                elif name == "username":
1359                    user = 'NT AUTHORITY\\SYSTEM' if WINDOWS else 'root'
1360                    self.assertEqual(p.username(), user)
1361                elif name == "name":
1362                    assert name, name
1364        if not OPENBSD:
1365            self.assertIn(0, psutil.pids())
1366            assert psutil.pid_exists(0)
1368    @unittest.skipIf(not HAS_ENVIRON, "not supported")
1369    def test_environ(self):
1370        def clean_dict(d):
1371            # Most of these are problematic on Travis.
1372            d.pop("PSUTIL_TESTING", None)
1373            d.pop("PLAT", None)
1374            d.pop("HOME", None)
1375            if MACOS:
1376                d.pop("__CF_USER_TEXT_ENCODING", None)
1377                d.pop("VERSIONER_PYTHON_PREFER_32_BIT", None)
1378                d.pop("VERSIONER_PYTHON_VERSION", None)
1379            return dict(
1380                [(k.replace("\r", "").replace("\n", ""),
1381                  v.replace("\r", "").replace("\n", ""))
1382                 for k, v in d.items()])
1384        self.maxDiff = None
1385        p = psutil.Process()
1386        d1 = clean_dict(p.environ())
1387        d2 = clean_dict(os.environ.copy())
1388        if not OSX and GITHUB_ACTIONS:
1389            self.assertEqual(d1, d2)
1391    @unittest.skipIf(not HAS_ENVIRON, "not supported")
1392    @unittest.skipIf(not POSIX, "POSIX only")
1393    def test_weird_environ(self):
1394        # environment variables can contain values without an equals sign
1395        code = textwrap.dedent("""
1396            #include <unistd.h>
1397            #include <fcntl.h>
1398            char * const argv[] = {"cat", 0};
1399            char * const envp[] = {"A=1", "X", "C=3", 0};
1400            int main(void) {
1401                /* Close stderr on exec so parent can wait for the execve to
1402                 * finish. */
1403                if (fcntl(2, F_SETFD, FD_CLOEXEC) != 0)
1404                    return 0;
1405                return execve("/bin/cat", argv, envp);
1406            }
1407            """)
1408        path = self.get_testfn()
1409        create_exe(path, c_code=code)
1410        sproc = self.spawn_testproc(
1411            [path], stdin=subprocess.PIPE, stderr=subprocess.PIPE)
1412        p = psutil.Process(sproc.pid)
1413        wait_for_pid(p.pid)
1414        assert p.is_running()
1415        # Wait for process to exec or exit.
1416        self.assertEqual(sproc.stderr.read(), b"")
1417        if MACOS and CI_TESTING:
1418            try:
1419                env = p.environ()
1420            except psutil.AccessDenied:
1421                # XXX: fails sometimes with:
1422                # PermissionError from 'sysctl(KERN_PROCARGS2) -> EIO'
1423                return
1424        else:
1425            env = p.environ()
1426        self.assertEqual(env, {"A": "1", "C": "3"})
1427        sproc.communicate()
1428        self.assertEqual(sproc.returncode, 0)
1431# ===================================================================
1432# --- Limited user tests
1433# ===================================================================
1436if POSIX and os.getuid() == 0:
1438    class LimitedUserTestCase(TestProcess):
1439        """Repeat the previous tests by using a limited user.
1440        Executed only on UNIX and only if the user who run the test script
1441        is root.
1442        """
1443        # the uid/gid the test suite runs under
1444        if hasattr(os, 'getuid'):
1445            PROCESS_UID = os.getuid()
1446            PROCESS_GID = os.getgid()
1448        def __init__(self, *args, **kwargs):
1449            super().__init__(*args, **kwargs)
1450            # re-define all existent test methods in order to
1451            # ignore AccessDenied exceptions
1452            for attr in [x for x in dir(self) if x.startswith('test')]:
1453                meth = getattr(self, attr)
1455                def test_(self):
1456                    try:
1457                        meth()
1458                    except psutil.AccessDenied:
1459                        pass
1460                setattr(self, attr, types.MethodType(test_, self))
1462        def setUp(self):
1463            super().setUp()
1464            os.setegid(1000)
1465            os.seteuid(1000)
1467        def tearDown(self):
1468            os.setegid(self.PROCESS_UID)
1469            os.seteuid(self.PROCESS_GID)
1470            super().tearDown()
1472        def test_nice(self):
1473            try:
1474                psutil.Process().nice(-1)
1475            except psutil.AccessDenied:
1476                pass
1477            else:
1478                self.fail("exception not raised")
1480        @unittest.skipIf(1, "causes problem as root")
1481        def test_zombie_process(self):
1482            pass
1485# ===================================================================
1486# --- psutil.Popen tests
1487# ===================================================================
1490class TestPopen(PsutilTestCase):
1491    """Tests for psutil.Popen class."""
1493    @classmethod
1494    def tearDownClass(cls):
1495        reap_children()
1497    def test_misc(self):
1498        # XXX this test causes a ResourceWarning on Python 3 because
1499        # psutil.__subproc instance doesn't get propertly freed.
1500        # Not sure what to do though.
1501        cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1502        with psutil.Popen(cmd, stdout=subprocess.PIPE,
1503                          stderr=subprocess.PIPE) as proc:
1504            proc.name()
1505            proc.cpu_times()
1506            proc.stdin
1507            self.assertTrue(dir(proc))
1508            self.assertRaises(AttributeError, getattr, proc, 'foo')
1509            proc.terminate()
1510        if POSIX:
1511            self.assertEqual(proc.wait(5), -signal.SIGTERM)
1512        else:
1513            self.assertEqual(proc.wait(5), signal.SIGTERM)
1515    def test_ctx_manager(self):
1516        with psutil.Popen([PYTHON_EXE, "-V"],
1517                          stdout=subprocess.PIPE,
1518                          stderr=subprocess.PIPE,
1519                          stdin=subprocess.PIPE) as proc:
1520            proc.communicate()
1521        assert proc.stdout.closed
1522        assert proc.stderr.closed
1523        assert proc.stdin.closed
1524        self.assertEqual(proc.returncode, 0)
1526    def test_kill_terminate(self):
1527        # subprocess.Popen()'s terminate(), kill() and send_signal() do
1528        # not raise exception after the process is gone. psutil.Popen
1529        # diverges from that.
1530        cmd = [PYTHON_EXE, "-c", "import time; time.sleep(60);"]
1531        with psutil.Popen(cmd, stdout=subprocess.PIPE,
1532                          stderr=subprocess.PIPE) as proc:
1533            proc.terminate()
1534            proc.wait()
1535            self.assertRaises(psutil.NoSuchProcess, proc.terminate)
1536            self.assertRaises(psutil.NoSuchProcess, proc.kill)
1537            self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1538                              signal.SIGTERM)
1539            if WINDOWS and sys.version_info >= (2, 7):
1540                self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1541                                  signal.CTRL_C_EVENT)
1542                self.assertRaises(psutil.NoSuchProcess, proc.send_signal,
1543                                  signal.CTRL_BREAK_EVENT)
1546if __name__ == '__main__':
1547    from psutil.tests.runner import run_from_name
1548    run_from_name(__file__)