1#!/usr/bin/env python3
2
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7"""Linux specific tests."""
8
9from __future__ import division
10import collections
11import contextlib
12import errno
13import glob
14import io
15import os
16import re
17import shutil
18import socket
19import struct
20import tempfile
21import textwrap
22import time
23import warnings
24
25import psutil
26from psutil import LINUX
27from psutil._compat import basestring
28from psutil._compat import FileNotFoundError
29from psutil._compat import PY3
30from psutil._compat import u
31from psutil.tests import call_until
32from psutil.tests import HAS_BATTERY
33from psutil.tests import HAS_CPU_FREQ
34from psutil.tests import HAS_GETLOADAVG
35from psutil.tests import HAS_RLIMIT
36from psutil.tests import MEMORY_TOLERANCE
37from psutil.tests import mock
38from psutil.tests import PYPY
39from psutil.tests import pyrun
40from psutil.tests import reap_children
41from psutil.tests import reload_module
42from psutil.tests import retry_on_failure
43from psutil.tests import safe_rmpath
44from psutil.tests import sh
45from psutil.tests import skip_on_not_implemented
46from psutil.tests import TESTFN
47from psutil.tests import ThreadTask
48from psutil.tests import TRAVIS
49from psutil.tests import unittest
50from psutil.tests import which
51
52
53HERE = os.path.abspath(os.path.dirname(__file__))
54SIOCGIFADDR = 0x8915
55SIOCGIFCONF = 0x8912
56SIOCGIFHWADDR = 0x8927
57if LINUX:
58    SECTOR_SIZE = 512
59EMPTY_TEMPERATURES = not glob.glob('/sys/class/hwmon/hwmon*')
60
61# =====================================================================
62# --- utils
63# =====================================================================
64
65
66def get_ipv4_address(ifname):
67    import fcntl
68    ifname = ifname[:15]
69    if PY3:
70        ifname = bytes(ifname, 'ascii')
71    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
72    with contextlib.closing(s):
73        return socket.inet_ntoa(
74            fcntl.ioctl(s.fileno(),
75                        SIOCGIFADDR,
76                        struct.pack('256s', ifname))[20:24])
77
78
79def get_mac_address(ifname):
80    import fcntl
81    ifname = ifname[:15]
82    if PY3:
83        ifname = bytes(ifname, 'ascii')
84    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
85    with contextlib.closing(s):
86        info = fcntl.ioctl(
87            s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
88        if PY3:
89            def ord(x):
90                return x
91        else:
92            import __builtin__
93            ord = __builtin__.ord
94        return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
95
96
97def free_swap():
98    """Parse 'free' cmd and return swap memory's s total, used and free
99    values.
100    """
101    out = sh('free -b', env={"LANG": "C.UTF-8"})
102    lines = out.split('\n')
103    for line in lines:
104        if line.startswith('Swap'):
105            _, total, used, free = line.split()
106            nt = collections.namedtuple('free', 'total used free')
107            return nt(int(total), int(used), int(free))
108    raise ValueError(
109        "can't find 'Swap' in 'free' output:\n%s" % '\n'.join(lines))
110
111
112def free_physmem():
113    """Parse 'free' cmd and return physical memory's total, used
114    and free values.
115    """
116    # Note: free can have 2 different formats, invalidating 'shared'
117    # and 'cached' memory which may have different positions so we
118    # do not return them.
119    # https://github.com/giampaolo/psutil/issues/538#issuecomment-57059946
120    out = sh('free -b', env={"LANG": "C.UTF-8"})
121    lines = out.split('\n')
122    for line in lines:
123        if line.startswith('Mem'):
124            total, used, free, shared = \
125                [int(x) for x in line.split()[1:5]]
126            nt = collections.namedtuple(
127                'free', 'total used free shared output')
128            return nt(total, used, free, shared, out)
129    raise ValueError(
130        "can't find 'Mem' in 'free' output:\n%s" % '\n'.join(lines))
131
132
133def vmstat(stat):
134    out = sh("vmstat -s", env={"LANG": "C.UTF-8"})
135    for line in out.split("\n"):
136        line = line.strip()
137        if stat in line:
138            return int(line.split(' ')[0])
139    raise ValueError("can't find %r in 'vmstat' output" % stat)
140
141
142def get_free_version_info():
143    out = sh("free -V").strip()
144    return tuple(map(int, out.split()[-1].split('.')))
145
146
147@contextlib.contextmanager
148def mock_open_content(for_path, content):
149    """Mock open() builtin and forces it to return a certain `content`
150    on read() if the path being opened matches `for_path`.
151    """
152    def open_mock(name, *args, **kwargs):
153        if name == for_path:
154            if PY3:
155                if isinstance(content, basestring):
156                    return io.StringIO(content)
157                else:
158                    return io.BytesIO(content)
159            else:
160                return io.BytesIO(content)
161        else:
162            return orig_open(name, *args, **kwargs)
163
164    orig_open = open
165    patch_point = 'builtins.open' if PY3 else '__builtin__.open'
166    with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
167        yield m
168
169
170@contextlib.contextmanager
171def mock_open_exception(for_path, exc):
172    """Mock open() builtin and raises `exc` if the path being opened
173    matches `for_path`.
174    """
175    def open_mock(name, *args, **kwargs):
176        if name == for_path:
177            raise exc
178        else:
179            return orig_open(name, *args, **kwargs)
180
181    orig_open = open
182    patch_point = 'builtins.open' if PY3 else '__builtin__.open'
183    with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
184        yield m
185
186
187# =====================================================================
188# --- system virtual memory
189# =====================================================================
190
191
192@unittest.skipIf(not LINUX, "LINUX only")
193class TestSystemVirtualMemory(unittest.TestCase):
194
195    def test_total(self):
196        # free_value = free_physmem().total
197        # psutil_value = psutil.virtual_memory().total
198        # self.assertEqual(free_value, psutil_value)
199        vmstat_value = vmstat('total memory') * 1024
200        psutil_value = psutil.virtual_memory().total
201        self.assertAlmostEqual(vmstat_value, psutil_value)
202
203    # Older versions of procps used slab memory to calculate used memory.
204    # This got changed in:
205    # https://gitlab.com/procps-ng/procps/commit/
206    #     05d751c4f076a2f0118b914c5e51cfbb4762ad8e
207    @unittest.skipIf(LINUX and get_free_version_info() < (3, 3, 12),
208                     "old free version")
209    @retry_on_failure()
210    def test_used(self):
211        free = free_physmem()
212        free_value = free.used
213        psutil_value = psutil.virtual_memory().used
214        self.assertAlmostEqual(
215            free_value, psutil_value, delta=MEMORY_TOLERANCE,
216            msg='%s %s \n%s' % (free_value, psutil_value, free.output))
217
218    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
219    @retry_on_failure()
220    def test_free(self):
221        vmstat_value = vmstat('free memory') * 1024
222        psutil_value = psutil.virtual_memory().free
223        self.assertAlmostEqual(
224            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
225
226    @retry_on_failure()
227    def test_buffers(self):
228        vmstat_value = vmstat('buffer memory') * 1024
229        psutil_value = psutil.virtual_memory().buffers
230        self.assertAlmostEqual(
231            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
232
233    # https://travis-ci.org/giampaolo/psutil/jobs/226719664
234    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
235    @retry_on_failure()
236    def test_active(self):
237        vmstat_value = vmstat('active memory') * 1024
238        psutil_value = psutil.virtual_memory().active
239        self.assertAlmostEqual(
240            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
241
242    # https://travis-ci.org/giampaolo/psutil/jobs/227242952
243    @unittest.skipIf(TRAVIS, "unreliable on TRAVIS")
244    @retry_on_failure()
245    def test_inactive(self):
246        vmstat_value = vmstat('inactive memory') * 1024
247        psutil_value = psutil.virtual_memory().inactive
248        self.assertAlmostEqual(
249            vmstat_value, psutil_value, delta=MEMORY_TOLERANCE)
250
251    @retry_on_failure()
252    def test_shared(self):
253        free = free_physmem()
254        free_value = free.shared
255        if free_value == 0:
256            raise unittest.SkipTest("free does not support 'shared' column")
257        psutil_value = psutil.virtual_memory().shared
258        self.assertAlmostEqual(
259            free_value, psutil_value, delta=MEMORY_TOLERANCE,
260            msg='%s %s \n%s' % (free_value, psutil_value, free.output))
261
262    @retry_on_failure()
263    def test_available(self):
264        # "free" output format has changed at some point:
265        # https://github.com/giampaolo/psutil/issues/538#issuecomment-147192098
266        out = sh("free -b")
267        lines = out.split('\n')
268        if 'available' not in lines[0]:
269            raise unittest.SkipTest("free does not support 'available' column")
270        else:
271            free_value = int(lines[1].split()[-1])
272            psutil_value = psutil.virtual_memory().available
273            self.assertAlmostEqual(
274                free_value, psutil_value, delta=MEMORY_TOLERANCE,
275                msg='%s %s \n%s' % (free_value, psutil_value, out))
276
277    def test_warnings_on_misses(self):
278        # Emulate a case where /proc/meminfo provides few info.
279        # psutil is supposed to set the missing fields to 0 and
280        # raise a warning.
281        with mock_open_content(
282            '/proc/meminfo',
283            textwrap.dedent("""\
284                Active(anon):    6145416 kB
285                Active(file):    2950064 kB
286                Inactive(anon):   574764 kB
287                Inactive(file):  1567648 kB
288                MemAvailable:         -1 kB
289                MemFree:         2057400 kB
290                MemTotal:       16325648 kB
291                SReclaimable:     346648 kB
292                """).encode()) as m:
293            with warnings.catch_warnings(record=True) as ws:
294                warnings.simplefilter("always")
295                ret = psutil.virtual_memory()
296                assert m.called
297                self.assertEqual(len(ws), 1)
298                w = ws[0]
299                assert w.filename.endswith('psutil/_pslinux.py')
300                self.assertIn(
301                    "memory stats couldn't be determined", str(w.message))
302                self.assertIn("cached", str(w.message))
303                self.assertIn("shared", str(w.message))
304                self.assertIn("active", str(w.message))
305                self.assertIn("inactive", str(w.message))
306                self.assertIn("buffers", str(w.message))
307                self.assertIn("available", str(w.message))
308                self.assertEqual(ret.cached, 0)
309                self.assertEqual(ret.active, 0)
310                self.assertEqual(ret.inactive, 0)
311                self.assertEqual(ret.shared, 0)
312                self.assertEqual(ret.buffers, 0)
313                self.assertEqual(ret.available, 0)
314                self.assertEqual(ret.slab, 0)
315
316    @retry_on_failure()
317    def test_avail_old_percent(self):
318        # Make sure that our calculation of avail mem for old kernels
319        # is off by max 15%.
320        from psutil._pslinux import calculate_avail_vmem
321        from psutil._pslinux import open_binary
322
323        mems = {}
324        with open_binary('/proc/meminfo') as f:
325            for line in f:
326                fields = line.split()
327                mems[fields[0]] = int(fields[1]) * 1024
328
329        a = calculate_avail_vmem(mems)
330        if b'MemAvailable:' in mems:
331            b = mems[b'MemAvailable:']
332            diff_percent = abs(a - b) / a * 100
333            self.assertLess(diff_percent, 15)
334
335    def test_avail_old_comes_from_kernel(self):
336        # Make sure "MemAvailable:" coluimn is used instead of relying
337        # on our internal algorithm to calculate avail mem.
338        with mock_open_content(
339            '/proc/meminfo',
340            textwrap.dedent("""\
341                Active:          9444728 kB
342                Active(anon):    6145416 kB
343                Active(file):    2950064 kB
344                Buffers:          287952 kB
345                Cached:          4818144 kB
346                Inactive(file):  1578132 kB
347                Inactive(anon):   574764 kB
348                Inactive(file):  1567648 kB
349                MemAvailable:    6574984 kB
350                MemFree:         2057400 kB
351                MemTotal:       16325648 kB
352                Shmem:            577588 kB
353                SReclaimable:     346648 kB
354                """).encode()) as m:
355            with warnings.catch_warnings(record=True) as ws:
356                ret = psutil.virtual_memory()
357            assert m.called
358            self.assertEqual(ret.available, 6574984 * 1024)
359            w = ws[0]
360            self.assertIn(
361                "inactive memory stats couldn't be determined", str(w.message))
362
363    def test_avail_old_missing_fields(self):
364        # Remove Active(file), Inactive(file) and SReclaimable
365        # from /proc/meminfo and make sure the fallback is used
366        # (free + cached),
367        with mock_open_content(
368            "/proc/meminfo",
369            textwrap.dedent("""\
370                    Active:          9444728 kB
371                    Active(anon):    6145416 kB
372                    Buffers:          287952 kB
373                    Cached:          4818144 kB
374                    Inactive(file):  1578132 kB
375                    Inactive(anon):   574764 kB
376                    MemFree:         2057400 kB
377                    MemTotal:       16325648 kB
378                    Shmem:            577588 kB
379                    """).encode()) as m:
380            with warnings.catch_warnings(record=True) as ws:
381                ret = psutil.virtual_memory()
382            assert m.called
383            self.assertEqual(ret.available, 2057400 * 1024 + 4818144 * 1024)
384            w = ws[0]
385            self.assertIn(
386                "inactive memory stats couldn't be determined", str(w.message))
387
388    def test_avail_old_missing_zoneinfo(self):
389        # Remove /proc/zoneinfo file. Make sure fallback is used
390        # (free + cached).
391        with mock_open_content(
392                "/proc/meminfo",
393                textwrap.dedent("""\
394                    Active:          9444728 kB
395                    Active(anon):    6145416 kB
396                    Active(file):    2950064 kB
397                    Buffers:          287952 kB
398                    Cached:          4818144 kB
399                    Inactive(file):  1578132 kB
400                    Inactive(anon):   574764 kB
401                    Inactive(file):  1567648 kB
402                    MemFree:         2057400 kB
403                    MemTotal:       16325648 kB
404                    Shmem:            577588 kB
405                    SReclaimable:     346648 kB
406                    """).encode()):
407            with mock_open_exception(
408                    "/proc/zoneinfo",
409                    IOError(errno.ENOENT, 'no such file or directory')):
410                with warnings.catch_warnings(record=True) as ws:
411                    ret = psutil.virtual_memory()
412                    self.assertEqual(
413                        ret.available, 2057400 * 1024 + 4818144 * 1024)
414                    w = ws[0]
415                    self.assertIn(
416                        "inactive memory stats couldn't be determined",
417                        str(w.message))
418
419    def test_virtual_memory_mocked(self):
420        # Emulate /proc/meminfo because neither vmstat nor free return slab.
421        def open_mock(name, *args, **kwargs):
422            if name == '/proc/meminfo':
423                return io.BytesIO(textwrap.dedent("""\
424                    MemTotal:              100 kB
425                    MemFree:               2 kB
426                    MemAvailable:          3 kB
427                    Buffers:               4 kB
428                    Cached:                5 kB
429                    SwapCached:            6 kB
430                    Active:                7 kB
431                    Inactive:              8 kB
432                    Active(anon):          9 kB
433                    Inactive(anon):        10 kB
434                    Active(file):          11 kB
435                    Inactive(file):        12 kB
436                    Unevictable:           13 kB
437                    Mlocked:               14 kB
438                    SwapTotal:             15 kB
439                    SwapFree:              16 kB
440                    Dirty:                 17 kB
441                    Writeback:             18 kB
442                    AnonPages:             19 kB
443                    Mapped:                20 kB
444                    Shmem:                 21 kB
445                    Slab:                  22 kB
446                    SReclaimable:          23 kB
447                    SUnreclaim:            24 kB
448                    KernelStack:           25 kB
449                    PageTables:            26 kB
450                    NFS_Unstable:          27 kB
451                    Bounce:                28 kB
452                    WritebackTmp:          29 kB
453                    CommitLimit:           30 kB
454                    Committed_AS:          31 kB
455                    VmallocTotal:          32 kB
456                    VmallocUsed:           33 kB
457                    VmallocChunk:          34 kB
458                    HardwareCorrupted:     35 kB
459                    AnonHugePages:         36 kB
460                    ShmemHugePages:        37 kB
461                    ShmemPmdMapped:        38 kB
462                    CmaTotal:              39 kB
463                    CmaFree:               40 kB
464                    HugePages_Total:       41 kB
465                    HugePages_Free:        42 kB
466                    HugePages_Rsvd:        43 kB
467                    HugePages_Surp:        44 kB
468                    Hugepagesize:          45 kB
469                    DirectMap46k:          46 kB
470                    DirectMap47M:          47 kB
471                    DirectMap48G:          48 kB
472                    """).encode())
473            else:
474                return orig_open(name, *args, **kwargs)
475
476        orig_open = open
477        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
478        with mock.patch(patch_point, create=True, side_effect=open_mock) as m:
479            mem = psutil.virtual_memory()
480            assert m.called
481            self.assertEqual(mem.total, 100 * 1024)
482            self.assertEqual(mem.free, 2 * 1024)
483            self.assertEqual(mem.buffers, 4 * 1024)
484            # cached mem also includes reclaimable memory
485            self.assertEqual(mem.cached, (5 + 23) * 1024)
486            self.assertEqual(mem.shared, 21 * 1024)
487            self.assertEqual(mem.active, 7 * 1024)
488            self.assertEqual(mem.inactive, 8 * 1024)
489            self.assertEqual(mem.slab, 22 * 1024)
490            self.assertEqual(mem.available, 3 * 1024)
491
492
493# =====================================================================
494# --- system swap memory
495# =====================================================================
496
497
498@unittest.skipIf(not LINUX, "LINUX only")
499class TestSystemSwapMemory(unittest.TestCase):
500
501    @staticmethod
502    def meminfo_has_swap_info():
503        """Return True if /proc/meminfo provides swap metrics."""
504        with open("/proc/meminfo") as f:
505            data = f.read()
506        return 'SwapTotal:' in data and 'SwapFree:' in data
507
508    def test_total(self):
509        free_value = free_swap().total
510        psutil_value = psutil.swap_memory().total
511        return self.assertAlmostEqual(
512            free_value, psutil_value, delta=MEMORY_TOLERANCE)
513
514    @retry_on_failure()
515    def test_used(self):
516        free_value = free_swap().used
517        psutil_value = psutil.swap_memory().used
518        return self.assertAlmostEqual(
519            free_value, psutil_value, delta=MEMORY_TOLERANCE)
520
521    @retry_on_failure()
522    def test_free(self):
523        free_value = free_swap().free
524        psutil_value = psutil.swap_memory().free
525        return self.assertAlmostEqual(
526            free_value, psutil_value, delta=MEMORY_TOLERANCE)
527
528    def test_missing_sin_sout(self):
529        with mock.patch('psutil._common.open', create=True) as m:
530            with warnings.catch_warnings(record=True) as ws:
531                warnings.simplefilter("always")
532                ret = psutil.swap_memory()
533                assert m.called
534                self.assertEqual(len(ws), 1)
535                w = ws[0]
536                assert w.filename.endswith('psutil/_pslinux.py')
537                self.assertIn(
538                    "'sin' and 'sout' swap memory stats couldn't "
539                    "be determined", str(w.message))
540                self.assertEqual(ret.sin, 0)
541                self.assertEqual(ret.sout, 0)
542
543    def test_no_vmstat_mocked(self):
544        # see https://github.com/giampaolo/psutil/issues/722
545        with mock_open_exception(
546                "/proc/vmstat",
547                IOError(errno.ENOENT, 'no such file or directory')) as m:
548            with warnings.catch_warnings(record=True) as ws:
549                warnings.simplefilter("always")
550                ret = psutil.swap_memory()
551                assert m.called
552                self.assertEqual(len(ws), 1)
553                w = ws[0]
554                assert w.filename.endswith('psutil/_pslinux.py')
555                self.assertIn(
556                    "'sin' and 'sout' swap memory stats couldn't "
557                    "be determined and were set to 0",
558                    str(w.message))
559                self.assertEqual(ret.sin, 0)
560                self.assertEqual(ret.sout, 0)
561
562    def test_meminfo_against_sysinfo(self):
563        # Make sure the content of /proc/meminfo about swap memory
564        # matches sysinfo() syscall, see:
565        # https://github.com/giampaolo/psutil/issues/1015
566        if not self.meminfo_has_swap_info():
567            return unittest.skip("/proc/meminfo has no swap metrics")
568        with mock.patch('psutil._pslinux.cext.linux_sysinfo') as m:
569            swap = psutil.swap_memory()
570        assert not m.called
571        import psutil._psutil_linux as cext
572        _, _, _, _, total, free, unit_multiplier = cext.linux_sysinfo()
573        total *= unit_multiplier
574        free *= unit_multiplier
575        self.assertEqual(swap.total, total)
576        self.assertAlmostEqual(swap.free, free, delta=MEMORY_TOLERANCE)
577
578    def test_emulate_meminfo_has_no_metrics(self):
579        # Emulate a case where /proc/meminfo provides no swap metrics
580        # in which case sysinfo() syscall is supposed to be used
581        # as a fallback.
582        with mock_open_content("/proc/meminfo", b"") as m:
583            psutil.swap_memory()
584            assert m.called
585
586
587# =====================================================================
588# --- system CPU
589# =====================================================================
590
591
592@unittest.skipIf(not LINUX, "LINUX only")
593class TestSystemCPUTimes(unittest.TestCase):
594
595    @unittest.skipIf(TRAVIS, "unknown failure on travis")
596    def test_fields(self):
597        fields = psutil.cpu_times()._fields
598        kernel_ver = re.findall(r'\d+\.\d+\.\d+', os.uname()[2])[0]
599        kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
600        if kernel_ver_info >= (2, 6, 11):
601            self.assertIn('steal', fields)
602        else:
603            self.assertNotIn('steal', fields)
604        if kernel_ver_info >= (2, 6, 24):
605            self.assertIn('guest', fields)
606        else:
607            self.assertNotIn('guest', fields)
608        if kernel_ver_info >= (3, 2, 0):
609            self.assertIn('guest_nice', fields)
610        else:
611            self.assertNotIn('guest_nice', fields)
612
613
614@unittest.skipIf(not LINUX, "LINUX only")
615class TestSystemCPUCountLogical(unittest.TestCase):
616
617    @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu/online"),
618                     "/sys/devices/system/cpu/online does not exist")
619    def test_against_sysdev_cpu_online(self):
620        with open("/sys/devices/system/cpu/online") as f:
621            value = f.read().strip()
622        if "-" in str(value):
623            value = int(value.split('-')[1]) + 1
624            self.assertEqual(psutil.cpu_count(), value)
625
626    @unittest.skipIf(not os.path.exists("/sys/devices/system/cpu"),
627                     "/sys/devices/system/cpu does not exist")
628    def test_against_sysdev_cpu_num(self):
629        ls = os.listdir("/sys/devices/system/cpu")
630        count = len([x for x in ls if re.search(r"cpu\d+$", x) is not None])
631        self.assertEqual(psutil.cpu_count(), count)
632
633    @unittest.skipIf(not which("nproc"), "nproc utility not available")
634    def test_against_nproc(self):
635        num = int(sh("nproc --all"))
636        self.assertEqual(psutil.cpu_count(logical=True), num)
637
638    @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
639    def test_against_lscpu(self):
640        out = sh("lscpu -p")
641        num = len([x for x in out.split('\n') if not x.startswith('#')])
642        self.assertEqual(psutil.cpu_count(logical=True), num)
643
644    def test_emulate_fallbacks(self):
645        import psutil._pslinux
646        original = psutil._pslinux.cpu_count_logical()
647        # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
648        # order to cause the parsing of /proc/cpuinfo and /proc/stat.
649        with mock.patch(
650                'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
651            self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
652            assert m.called
653
654            # Let's have open() return emtpy data and make sure None is
655            # returned ('cause we mimick os.cpu_count()).
656            with mock.patch('psutil._common.open', create=True) as m:
657                self.assertIsNone(psutil._pslinux.cpu_count_logical())
658                self.assertEqual(m.call_count, 2)
659                # /proc/stat should be the last one
660                self.assertEqual(m.call_args[0][0], '/proc/stat')
661
662            # Let's push this a bit further and make sure /proc/cpuinfo
663            # parsing works as expected.
664            with open('/proc/cpuinfo', 'rb') as f:
665                cpuinfo_data = f.read()
666            fake_file = io.BytesIO(cpuinfo_data)
667            with mock.patch('psutil._common.open',
668                            return_value=fake_file, create=True) as m:
669                self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
670
671            # Finally, let's make /proc/cpuinfo return meaningless data;
672            # this way we'll fall back on relying on /proc/stat
673            with mock_open_content('/proc/cpuinfo', b"") as m:
674                self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
675                m.called
676
677
678@unittest.skipIf(not LINUX, "LINUX only")
679class TestSystemCPUCountPhysical(unittest.TestCase):
680
681    @unittest.skipIf(not which("lscpu"), "lscpu utility not available")
682    def test_against_lscpu(self):
683        out = sh("lscpu -p")
684        core_ids = set()
685        for line in out.split('\n'):
686            if not line.startswith('#'):
687                fields = line.split(',')
688                core_ids.add(fields[1])
689        self.assertEqual(psutil.cpu_count(logical=False), len(core_ids))
690
691    def test_emulate_none(self):
692        with mock.patch('glob.glob', return_value=[]) as m1:
693            with mock.patch('psutil._common.open', create=True) as m2:
694                self.assertIsNone(psutil._pslinux.cpu_count_physical())
695        assert m1.called
696        assert m2.called
697
698
699@unittest.skipIf(not LINUX, "LINUX only")
700class TestSystemCPUFrequency(unittest.TestCase):
701
702    @unittest.skipIf(TRAVIS, "fails on Travis")
703    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
704    def test_emulate_use_second_file(self):
705        # https://github.com/giampaolo/psutil/issues/981
706        def path_exists_mock(path):
707            if path.startswith("/sys/devices/system/cpu/cpufreq/policy"):
708                return False
709            else:
710                return orig_exists(path)
711
712        orig_exists = os.path.exists
713        with mock.patch("os.path.exists", side_effect=path_exists_mock,
714                        create=True):
715            assert psutil.cpu_freq()
716
717    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
718    def test_emulate_use_cpuinfo(self):
719        # Emulate a case where /sys/devices/system/cpu/cpufreq* does not
720        # exist and /proc/cpuinfo is used instead.
721        def path_exists_mock(path):
722            if path.startswith('/sys/devices/system/cpu/'):
723                return False
724            else:
725                if path == "/proc/cpuinfo":
726                    flags.append(None)
727                return os_path_exists(path)
728
729        flags = []
730        os_path_exists = os.path.exists
731        try:
732            with mock.patch("os.path.exists", side_effect=path_exists_mock):
733                reload_module(psutil._pslinux)
734                ret = psutil.cpu_freq()
735                assert ret
736                assert flags
737                self.assertEqual(ret.max, 0.0)
738                self.assertEqual(ret.min, 0.0)
739                for freq in psutil.cpu_freq(percpu=True):
740                    self.assertEqual(ret.max, 0.0)
741                    self.assertEqual(ret.min, 0.0)
742        finally:
743            reload_module(psutil._pslinux)
744            reload_module(psutil)
745
746    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
747    def test_emulate_data(self):
748        def open_mock(name, *args, **kwargs):
749            if (name.endswith('/scaling_cur_freq') and
750                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
751                return io.BytesIO(b"500000")
752            elif (name.endswith('/scaling_min_freq') and
753                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
754                return io.BytesIO(b"600000")
755            elif (name.endswith('/scaling_max_freq') and
756                    name.startswith("/sys/devices/system/cpu/cpufreq/policy")):
757                return io.BytesIO(b"700000")
758            elif name == '/proc/cpuinfo':
759                return io.BytesIO(b"cpu MHz		: 500")
760            else:
761                return orig_open(name, *args, **kwargs)
762
763        orig_open = open
764        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
765        with mock.patch(patch_point, side_effect=open_mock):
766            with mock.patch(
767                    'os.path.exists', return_value=True):
768                freq = psutil.cpu_freq()
769                self.assertEqual(freq.current, 500.0)
770                # when /proc/cpuinfo is used min and max frequencies are not
771                # available and are set to 0.
772                if freq.min != 0.0:
773                    self.assertEqual(freq.min, 600.0)
774                if freq.max != 0.0:
775                    self.assertEqual(freq.max, 700.0)
776
777    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
778    def test_emulate_multi_cpu(self):
779        def open_mock(name, *args, **kwargs):
780            n = name
781            if (n.endswith('/scaling_cur_freq') and
782                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
783                return io.BytesIO(b"100000")
784            elif (n.endswith('/scaling_min_freq') and
785                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
786                return io.BytesIO(b"200000")
787            elif (n.endswith('/scaling_max_freq') and
788                    n.startswith("/sys/devices/system/cpu/cpufreq/policy0")):
789                return io.BytesIO(b"300000")
790            elif (n.endswith('/scaling_cur_freq') and
791                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
792                return io.BytesIO(b"400000")
793            elif (n.endswith('/scaling_min_freq') and
794                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
795                return io.BytesIO(b"500000")
796            elif (n.endswith('/scaling_max_freq') and
797                    n.startswith("/sys/devices/system/cpu/cpufreq/policy1")):
798                return io.BytesIO(b"600000")
799            elif name == '/proc/cpuinfo':
800                return io.BytesIO(b"cpu MHz		: 100\n"
801                                  b"cpu MHz		: 400")
802            else:
803                return orig_open(name, *args, **kwargs)
804
805        orig_open = open
806        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
807        with mock.patch(patch_point, side_effect=open_mock):
808            with mock.patch('os.path.exists', return_value=True):
809                with mock.patch('psutil._pslinux.cpu_count_logical',
810                                return_value=2):
811                    freq = psutil.cpu_freq(percpu=True)
812                    self.assertEqual(freq[0].current, 100.0)
813                    if freq[0].min != 0.0:
814                        self.assertEqual(freq[0].min, 200.0)
815                    if freq[0].max != 0.0:
816                        self.assertEqual(freq[0].max, 300.0)
817                    self.assertEqual(freq[1].current, 400.0)
818                    if freq[1].min != 0.0:
819                        self.assertEqual(freq[1].min, 500.0)
820                    if freq[1].max != 0.0:
821                        self.assertEqual(freq[1].max, 600.0)
822
823    @unittest.skipIf(TRAVIS, "fails on Travis")
824    @unittest.skipIf(not HAS_CPU_FREQ, "not supported")
825    def test_emulate_no_scaling_cur_freq_file(self):
826        # See: https://github.com/giampaolo/psutil/issues/1071
827        def open_mock(name, *args, **kwargs):
828            if name.endswith('/scaling_cur_freq'):
829                raise IOError(errno.ENOENT, "")
830            elif name.endswith('/cpuinfo_cur_freq'):
831                return io.BytesIO(b"200000")
832            elif name == '/proc/cpuinfo':
833                return io.BytesIO(b"cpu MHz		: 200")
834            else:
835                return orig_open(name, *args, **kwargs)
836
837        orig_open = open
838        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
839        with mock.patch(patch_point, side_effect=open_mock):
840            with mock.patch('os.path.exists', return_value=True):
841                with mock.patch('psutil._pslinux.cpu_count_logical',
842                                return_value=1):
843                    freq = psutil.cpu_freq()
844                    self.assertEqual(freq.current, 200)
845
846
847@unittest.skipIf(not LINUX, "LINUX only")
848class TestSystemCPUStats(unittest.TestCase):
849
850    @unittest.skipIf(TRAVIS, "fails on Travis")
851    def test_ctx_switches(self):
852        vmstat_value = vmstat("context switches")
853        psutil_value = psutil.cpu_stats().ctx_switches
854        self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
855
856    @unittest.skipIf(TRAVIS, "fails on Travis")
857    def test_interrupts(self):
858        vmstat_value = vmstat("interrupts")
859        psutil_value = psutil.cpu_stats().interrupts
860        self.assertAlmostEqual(vmstat_value, psutil_value, delta=500)
861
862
863@unittest.skipIf(not LINUX, "LINUX only")
864class TestLoadAvg(unittest.TestCase):
865
866    @unittest.skipIf(not HAS_GETLOADAVG, "not supported")
867    def test_getloadavg(self):
868        psutil_value = psutil.getloadavg()
869        with open("/proc/loadavg", "r") as f:
870            proc_value = f.read().split()
871
872        self.assertAlmostEqual(float(proc_value[0]), psutil_value[0], delta=1)
873        self.assertAlmostEqual(float(proc_value[1]), psutil_value[1], delta=1)
874        self.assertAlmostEqual(float(proc_value[2]), psutil_value[2], delta=1)
875
876
877# =====================================================================
878# --- system network
879# =====================================================================
880
881
882@unittest.skipIf(not LINUX, "LINUX only")
883class TestSystemNetIfAddrs(unittest.TestCase):
884
885    def test_ips(self):
886        for name, addrs in psutil.net_if_addrs().items():
887            for addr in addrs:
888                if addr.family == psutil.AF_LINK:
889                    self.assertEqual(addr.address, get_mac_address(name))
890                elif addr.family == socket.AF_INET:
891                    self.assertEqual(addr.address, get_ipv4_address(name))
892                # TODO: test for AF_INET6 family
893
894    # XXX - not reliable when having virtual NICs installed by Docker.
895    # @unittest.skipIf(not which('ip'), "'ip' utility not available")
896    # @unittest.skipIf(TRAVIS, "skipped on Travis")
897    # def test_net_if_names(self):
898    #     out = sh("ip addr").strip()
899    #     nics = [x for x in psutil.net_if_addrs().keys() if ':' not in x]
900    #     found = 0
901    #     for line in out.split('\n'):
902    #         line = line.strip()
903    #         if re.search(r"^\d+:", line):
904    #             found += 1
905    #             name = line.split(':')[1].strip()
906    #             self.assertIn(name, nics)
907    #     self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
908    #         pprint.pformat(nics), out))
909
910
911@unittest.skipIf(not LINUX, "LINUX only")
912class TestSystemNetIfStats(unittest.TestCase):
913
914    def test_against_ifconfig(self):
915        for name, stats in psutil.net_if_stats().items():
916            try:
917                out = sh("ifconfig %s" % name)
918            except RuntimeError:
919                pass
920            else:
921                # Not always reliable.
922                # self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
923                self.assertEqual(stats.mtu,
924                                 int(re.findall(r'(?i)MTU[: ](\d+)', out)[0]))
925
926
927@unittest.skipIf(not LINUX, "LINUX only")
928class TestSystemNetIOCounters(unittest.TestCase):
929
930    @retry_on_failure()
931    def test_against_ifconfig(self):
932        def ifconfig(nic):
933            ret = {}
934            out = sh("ifconfig %s" % name)
935            ret['packets_recv'] = int(
936                re.findall(r'RX packets[: ](\d+)', out)[0])
937            ret['packets_sent'] = int(
938                re.findall(r'TX packets[: ](\d+)', out)[0])
939            ret['errin'] = int(re.findall(r'errors[: ](\d+)', out)[0])
940            ret['errout'] = int(re.findall(r'errors[: ](\d+)', out)[1])
941            ret['dropin'] = int(re.findall(r'dropped[: ](\d+)', out)[0])
942            ret['dropout'] = int(re.findall(r'dropped[: ](\d+)', out)[1])
943            ret['bytes_recv'] = int(
944                re.findall(r'RX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
945            ret['bytes_sent'] = int(
946                re.findall(r'TX (?:packets \d+ +)?bytes[: ](\d+)', out)[0])
947            return ret
948
949        nio = psutil.net_io_counters(pernic=True, nowrap=False)
950        for name, stats in nio.items():
951            try:
952                ifconfig_ret = ifconfig(name)
953            except RuntimeError:
954                continue
955            self.assertAlmostEqual(
956                stats.bytes_recv, ifconfig_ret['bytes_recv'], delta=1024 * 5)
957            self.assertAlmostEqual(
958                stats.bytes_sent, ifconfig_ret['bytes_sent'], delta=1024 * 5)
959            self.assertAlmostEqual(
960                stats.packets_recv, ifconfig_ret['packets_recv'], delta=1024)
961            self.assertAlmostEqual(
962                stats.packets_sent, ifconfig_ret['packets_sent'], delta=1024)
963            self.assertAlmostEqual(
964                stats.errin, ifconfig_ret['errin'], delta=10)
965            self.assertAlmostEqual(
966                stats.errout, ifconfig_ret['errout'], delta=10)
967            self.assertAlmostEqual(
968                stats.dropin, ifconfig_ret['dropin'], delta=10)
969            self.assertAlmostEqual(
970                stats.dropout, ifconfig_ret['dropout'], delta=10)
971
972
973@unittest.skipIf(not LINUX, "LINUX only")
974class TestSystemNetConnections(unittest.TestCase):
975
976    @mock.patch('psutil._pslinux.socket.inet_ntop', side_effect=ValueError)
977    @mock.patch('psutil._pslinux.supports_ipv6', return_value=False)
978    def test_emulate_ipv6_unsupported(self, supports_ipv6, inet_ntop):
979        # see: https://github.com/giampaolo/psutil/issues/623
980        try:
981            s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
982            self.addCleanup(s.close)
983            s.bind(("::1", 0))
984        except socket.error:
985            pass
986        psutil.net_connections(kind='inet6')
987
988    def test_emulate_unix(self):
989        with mock_open_content(
990            '/proc/net/unix',
991            textwrap.dedent("""\
992                0: 00000003 000 000 0001 03 462170 @/tmp/dbus-Qw2hMPIU3n
993                0: 00000003 000 000 0001 03 35010 @/tmp/dbus-tB2X8h69BQ
994                0: 00000003 000 000 0001 03 34424 @/tmp/dbus-cHy80Y8O
995                000000000000000000000000000000000000000000000000000000
996                """)) as m:
997            psutil.net_connections(kind='unix')
998            assert m.called
999
1000
1001# =====================================================================
1002# --- system disks
1003# =====================================================================
1004
1005
1006@unittest.skipIf(not LINUX, "LINUX only")
1007class TestSystemDiskPartitions(unittest.TestCase):
1008
1009    @unittest.skipIf(not hasattr(os, 'statvfs'), "os.statvfs() not available")
1010    @skip_on_not_implemented()
1011    def test_against_df(self):
1012        # test psutil.disk_usage() and psutil.disk_partitions()
1013        # against "df -a"
1014        def df(path):
1015            out = sh('df -P -B 1 "%s"' % path).strip()
1016            lines = out.split('\n')
1017            lines.pop(0)
1018            line = lines.pop(0)
1019            dev, total, used, free = line.split()[:4]
1020            if dev == 'none':
1021                dev = ''
1022            total, used, free = int(total), int(used), int(free)
1023            return dev, total, used, free
1024
1025        for part in psutil.disk_partitions(all=False):
1026            usage = psutil.disk_usage(part.mountpoint)
1027            dev, total, used, free = df(part.mountpoint)
1028            self.assertEqual(usage.total, total)
1029            # 10 MB tollerance
1030            if abs(usage.free - free) > 10 * 1024 * 1024:
1031                self.fail("psutil=%s, df=%s" % (usage.free, free))
1032            if abs(usage.used - used) > 10 * 1024 * 1024:
1033                self.fail("psutil=%s, df=%s" % (usage.used, used))
1034
1035    def test_zfs_fs(self):
1036        # Test that ZFS partitions are returned.
1037        with open("/proc/filesystems", "r") as f:
1038            data = f.read()
1039        if 'zfs' in data:
1040            for part in psutil.disk_partitions():
1041                if part.fstype == 'zfs':
1042                    break
1043            else:
1044                self.fail("couldn't find any ZFS partition")
1045        else:
1046            # No ZFS partitions on this system. Let's fake one.
1047            fake_file = io.StringIO(u("nodev\tzfs\n"))
1048            with mock.patch('psutil._common.open',
1049                            return_value=fake_file, create=True) as m1:
1050                with mock.patch(
1051                        'psutil._pslinux.cext.disk_partitions',
1052                        return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
1053                    ret = psutil.disk_partitions()
1054                    assert m1.called
1055                    assert m2.called
1056                    assert ret
1057                    self.assertEqual(ret[0].fstype, 'zfs')
1058
1059    def test_emulate_realpath_fail(self):
1060        # See: https://github.com/giampaolo/psutil/issues/1307
1061        try:
1062            with mock.patch('os.path.realpath',
1063                            return_value='/non/existent') as m:
1064                with self.assertRaises(FileNotFoundError):
1065                    psutil.disk_partitions()
1066                assert m.called
1067        finally:
1068            psutil.PROCFS_PATH = "/proc"
1069
1070
1071@unittest.skipIf(not LINUX, "LINUX only")
1072class TestSystemDiskIoCounters(unittest.TestCase):
1073
1074    def test_emulate_kernel_2_4(self):
1075        # Tests /proc/diskstats parsing format for 2.4 kernels, see:
1076        # https://github.com/giampaolo/psutil/issues/767
1077        with mock_open_content(
1078                '/proc/diskstats',
1079                "   3     0   1 hda 2 3 4 5 6 7 8 9 10 11 12"):
1080            with mock.patch('psutil._pslinux.is_storage_device',
1081                            return_value=True):
1082                ret = psutil.disk_io_counters(nowrap=False)
1083                self.assertEqual(ret.read_count, 1)
1084                self.assertEqual(ret.read_merged_count, 2)
1085                self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1086                self.assertEqual(ret.read_time, 4)
1087                self.assertEqual(ret.write_count, 5)
1088                self.assertEqual(ret.write_merged_count, 6)
1089                self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1090                self.assertEqual(ret.write_time, 8)
1091                self.assertEqual(ret.busy_time, 10)
1092
1093    def test_emulate_kernel_2_6_full(self):
1094        # Tests /proc/diskstats parsing format for 2.6 kernels,
1095        # lines reporting all metrics:
1096        # https://github.com/giampaolo/psutil/issues/767
1097        with mock_open_content(
1098                '/proc/diskstats',
1099                "   3    0   hda 1 2 3 4 5 6 7 8 9 10 11"):
1100            with mock.patch('psutil._pslinux.is_storage_device',
1101                            return_value=True):
1102                ret = psutil.disk_io_counters(nowrap=False)
1103                self.assertEqual(ret.read_count, 1)
1104                self.assertEqual(ret.read_merged_count, 2)
1105                self.assertEqual(ret.read_bytes, 3 * SECTOR_SIZE)
1106                self.assertEqual(ret.read_time, 4)
1107                self.assertEqual(ret.write_count, 5)
1108                self.assertEqual(ret.write_merged_count, 6)
1109                self.assertEqual(ret.write_bytes, 7 * SECTOR_SIZE)
1110                self.assertEqual(ret.write_time, 8)
1111                self.assertEqual(ret.busy_time, 10)
1112
1113    def test_emulate_kernel_2_6_limited(self):
1114        # Tests /proc/diskstats parsing format for 2.6 kernels,
1115        # where one line of /proc/partitions return a limited
1116        # amount of metrics when it bumps into a partition
1117        # (instead of a disk). See:
1118        # https://github.com/giampaolo/psutil/issues/767
1119        with mock_open_content(
1120                '/proc/diskstats',
1121                "   3    1   hda 1 2 3 4"):
1122            with mock.patch('psutil._pslinux.is_storage_device',
1123                            return_value=True):
1124                ret = psutil.disk_io_counters(nowrap=False)
1125                self.assertEqual(ret.read_count, 1)
1126                self.assertEqual(ret.read_bytes, 2 * SECTOR_SIZE)
1127                self.assertEqual(ret.write_count, 3)
1128                self.assertEqual(ret.write_bytes, 4 * SECTOR_SIZE)
1129
1130                self.assertEqual(ret.read_merged_count, 0)
1131                self.assertEqual(ret.read_time, 0)
1132                self.assertEqual(ret.write_merged_count, 0)
1133                self.assertEqual(ret.write_time, 0)
1134                self.assertEqual(ret.busy_time, 0)
1135
1136    def test_emulate_include_partitions(self):
1137        # Make sure that when perdisk=True disk partitions are returned,
1138        # see:
1139        # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1140        with mock_open_content(
1141                '/proc/diskstats',
1142                textwrap.dedent("""\
1143                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1144                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1145                    """)):
1146            with mock.patch('psutil._pslinux.is_storage_device',
1147                            return_value=False):
1148                ret = psutil.disk_io_counters(perdisk=True, nowrap=False)
1149                self.assertEqual(len(ret), 2)
1150                self.assertEqual(ret['nvme0n1'].read_count, 1)
1151                self.assertEqual(ret['nvme0n1p1'].read_count, 1)
1152                self.assertEqual(ret['nvme0n1'].write_count, 5)
1153                self.assertEqual(ret['nvme0n1p1'].write_count, 5)
1154
1155    def test_emulate_exclude_partitions(self):
1156        # Make sure that when perdisk=False partitions (e.g. 'sda1',
1157        # 'nvme0n1p1') are skipped and not included in the total count.
1158        # https://github.com/giampaolo/psutil/pull/1313#issuecomment-408626842
1159        with mock_open_content(
1160                '/proc/diskstats',
1161                textwrap.dedent("""\
1162                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1163                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1164                    """)):
1165            with mock.patch('psutil._pslinux.is_storage_device',
1166                            return_value=False):
1167                ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1168                self.assertIsNone(ret)
1169
1170        #
1171        def is_storage_device(name):
1172            return name == 'nvme0n1'
1173
1174        with mock_open_content(
1175                '/proc/diskstats',
1176                textwrap.dedent("""\
1177                    3    0   nvme0n1 1 2 3 4 5 6 7 8 9 10 11
1178                    3    0   nvme0n1p1 1 2 3 4 5 6 7 8 9 10 11
1179                    """)):
1180            with mock.patch('psutil._pslinux.is_storage_device',
1181                            create=True, side_effect=is_storage_device):
1182                ret = psutil.disk_io_counters(perdisk=False, nowrap=False)
1183                self.assertEqual(ret.read_count, 1)
1184                self.assertEqual(ret.write_count, 5)
1185
1186    def test_emulate_use_sysfs(self):
1187        def exists(path):
1188            if path == '/proc/diskstats':
1189                return False
1190            return True
1191
1192        wprocfs = psutil.disk_io_counters(perdisk=True)
1193        with mock.patch('psutil._pslinux.os.path.exists',
1194                        create=True, side_effect=exists):
1195            wsysfs = psutil.disk_io_counters(perdisk=True)
1196        self.assertEqual(len(wprocfs), len(wsysfs))
1197
1198    def test_emulate_not_impl(self):
1199        def exists(path):
1200            return False
1201
1202        with mock.patch('psutil._pslinux.os.path.exists',
1203                        create=True, side_effect=exists):
1204            self.assertRaises(NotImplementedError, psutil.disk_io_counters)
1205
1206
1207# =====================================================================
1208# --- misc
1209# =====================================================================
1210
1211
1212@unittest.skipIf(not LINUX, "LINUX only")
1213class TestMisc(unittest.TestCase):
1214
1215    def test_boot_time(self):
1216        vmstat_value = vmstat('boot time')
1217        psutil_value = psutil.boot_time()
1218        self.assertEqual(int(vmstat_value), int(psutil_value))
1219
1220    def test_no_procfs_on_import(self):
1221        my_procfs = tempfile.mkdtemp()
1222
1223        with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1224            f.write('cpu   0 0 0 0 0 0 0 0 0 0\n')
1225            f.write('cpu0  0 0 0 0 0 0 0 0 0 0\n')
1226            f.write('cpu1  0 0 0 0 0 0 0 0 0 0\n')
1227
1228        try:
1229            orig_open = open
1230
1231            def open_mock(name, *args, **kwargs):
1232                if name.startswith('/proc'):
1233                    raise IOError(errno.ENOENT, 'rejecting access for test')
1234                return orig_open(name, *args, **kwargs)
1235
1236            patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1237            with mock.patch(patch_point, side_effect=open_mock):
1238                reload_module(psutil)
1239
1240                self.assertRaises(IOError, psutil.cpu_times)
1241                self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1242                self.assertRaises(IOError, psutil.cpu_percent)
1243                self.assertRaises(IOError, psutil.cpu_percent, percpu=True)
1244                self.assertRaises(IOError, psutil.cpu_times_percent)
1245                self.assertRaises(
1246                    IOError, psutil.cpu_times_percent, percpu=True)
1247
1248                psutil.PROCFS_PATH = my_procfs
1249
1250                self.assertEqual(psutil.cpu_percent(), 0)
1251                self.assertEqual(sum(psutil.cpu_times_percent()), 0)
1252
1253                # since we don't know the number of CPUs at import time,
1254                # we awkwardly say there are none until the second call
1255                per_cpu_percent = psutil.cpu_percent(percpu=True)
1256                self.assertEqual(sum(per_cpu_percent), 0)
1257
1258                # ditto awkward length
1259                per_cpu_times_percent = psutil.cpu_times_percent(percpu=True)
1260                self.assertEqual(sum(map(sum, per_cpu_times_percent)), 0)
1261
1262                # much user, very busy
1263                with open(os.path.join(my_procfs, 'stat'), 'w') as f:
1264                    f.write('cpu   1 0 0 0 0 0 0 0 0 0\n')
1265                    f.write('cpu0  1 0 0 0 0 0 0 0 0 0\n')
1266                    f.write('cpu1  1 0 0 0 0 0 0 0 0 0\n')
1267
1268                self.assertNotEqual(psutil.cpu_percent(), 0)
1269                self.assertNotEqual(
1270                    sum(psutil.cpu_percent(percpu=True)), 0)
1271                self.assertNotEqual(sum(psutil.cpu_times_percent()), 0)
1272                self.assertNotEqual(
1273                    sum(map(sum, psutil.cpu_times_percent(percpu=True))), 0)
1274        finally:
1275            shutil.rmtree(my_procfs)
1276            reload_module(psutil)
1277
1278        self.assertEqual(psutil.PROCFS_PATH, '/proc')
1279
1280    def test_cpu_steal_decrease(self):
1281        # Test cumulative cpu stats decrease. We should ignore this.
1282        # See issue #1210.
1283        with mock_open_content(
1284            "/proc/stat",
1285            textwrap.dedent("""\
1286                cpu   0 0 0 0 0 0 0 1 0 0
1287                cpu0  0 0 0 0 0 0 0 1 0 0
1288                cpu1  0 0 0 0 0 0 0 1 0 0
1289                """).encode()) as m:
1290            # first call to "percent" functions should read the new stat file
1291            # and compare to the "real" file read at import time - so the
1292            # values are meaningless
1293            psutil.cpu_percent()
1294            assert m.called
1295            psutil.cpu_percent(percpu=True)
1296            psutil.cpu_times_percent()
1297            psutil.cpu_times_percent(percpu=True)
1298
1299        with mock_open_content(
1300            "/proc/stat",
1301            textwrap.dedent("""\
1302                cpu   1 0 0 0 0 0 0 0 0 0
1303                cpu0  1 0 0 0 0 0 0 0 0 0
1304                cpu1  1 0 0 0 0 0 0 0 0 0
1305                """).encode()) as m:
1306            # Increase "user" while steal goes "backwards" to zero.
1307            cpu_percent = psutil.cpu_percent()
1308            assert m.called
1309            cpu_percent_percpu = psutil.cpu_percent(percpu=True)
1310            cpu_times_percent = psutil.cpu_times_percent()
1311            cpu_times_percent_percpu = psutil.cpu_times_percent(percpu=True)
1312            self.assertNotEqual(cpu_percent, 0)
1313            self.assertNotEqual(sum(cpu_percent_percpu), 0)
1314            self.assertNotEqual(sum(cpu_times_percent), 0)
1315            self.assertNotEqual(sum(cpu_times_percent), 100.0)
1316            self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 0)
1317            self.assertNotEqual(sum(map(sum, cpu_times_percent_percpu)), 100.0)
1318            self.assertEqual(cpu_times_percent.steal, 0)
1319            self.assertNotEqual(cpu_times_percent.user, 0)
1320
1321    def test_boot_time_mocked(self):
1322        with mock.patch('psutil._common.open', create=True) as m:
1323            self.assertRaises(
1324                RuntimeError,
1325                psutil._pslinux.boot_time)
1326            assert m.called
1327
1328    def test_users_mocked(self):
1329        # Make sure ':0' and ':0.0' (returned by C ext) are converted
1330        # to 'localhost'.
1331        with mock.patch('psutil._pslinux.cext.users',
1332                        return_value=[('giampaolo', 'pts/2', ':0',
1333                                       1436573184.0, True, 2)]) as m:
1334            self.assertEqual(psutil.users()[0].host, 'localhost')
1335            assert m.called
1336        with mock.patch('psutil._pslinux.cext.users',
1337                        return_value=[('giampaolo', 'pts/2', ':0.0',
1338                                       1436573184.0, True, 2)]) as m:
1339            self.assertEqual(psutil.users()[0].host, 'localhost')
1340            assert m.called
1341        # ...otherwise it should be returned as-is
1342        with mock.patch('psutil._pslinux.cext.users',
1343                        return_value=[('giampaolo', 'pts/2', 'foo',
1344                                       1436573184.0, True, 2)]) as m:
1345            self.assertEqual(psutil.users()[0].host, 'foo')
1346            assert m.called
1347
1348    def test_procfs_path(self):
1349        tdir = tempfile.mkdtemp()
1350        try:
1351            psutil.PROCFS_PATH = tdir
1352            self.assertRaises(IOError, psutil.virtual_memory)
1353            self.assertRaises(IOError, psutil.cpu_times)
1354            self.assertRaises(IOError, psutil.cpu_times, percpu=True)
1355            self.assertRaises(IOError, psutil.boot_time)
1356            # self.assertRaises(IOError, psutil.pids)
1357            self.assertRaises(IOError, psutil.net_connections)
1358            self.assertRaises(IOError, psutil.net_io_counters)
1359            self.assertRaises(IOError, psutil.net_if_stats)
1360            # self.assertRaises(IOError, psutil.disk_io_counters)
1361            self.assertRaises(IOError, psutil.disk_partitions)
1362            self.assertRaises(psutil.NoSuchProcess, psutil.Process)
1363        finally:
1364            psutil.PROCFS_PATH = "/proc"
1365            os.rmdir(tdir)
1366
1367    def test_issue_687(self):
1368        # In case of thread ID:
1369        # - pid_exists() is supposed to return False
1370        # - Process(tid) is supposed to work
1371        # - pids() should not return the TID
1372        # See: https://github.com/giampaolo/psutil/issues/687
1373        t = ThreadTask()
1374        t.start()
1375        try:
1376            p = psutil.Process()
1377            tid = p.threads()[1].id
1378            assert not psutil.pid_exists(tid), tid
1379            pt = psutil.Process(tid)
1380            pt.as_dict()
1381            self.assertNotIn(tid, psutil.pids())
1382        finally:
1383            t.stop()
1384
1385    def test_pid_exists_no_proc_status(self):
1386        # Internally pid_exists relies on /proc/{pid}/status.
1387        # Emulate a case where this file is empty in which case
1388        # psutil is supposed to fall back on using pids().
1389        with mock_open_content("/proc/%s/status", "") as m:
1390            assert psutil.pid_exists(os.getpid())
1391            assert m.called
1392
1393
1394# =====================================================================
1395# --- sensors
1396# =====================================================================
1397
1398
1399@unittest.skipIf(not LINUX, "LINUX only")
1400@unittest.skipIf(not HAS_BATTERY, "no battery")
1401class TestSensorsBattery(unittest.TestCase):
1402
1403    @unittest.skipIf(not which("acpi"), "acpi utility not available")
1404    def test_percent(self):
1405        out = sh("acpi -b")
1406        acpi_value = int(out.split(",")[1].strip().replace('%', ''))
1407        psutil_value = psutil.sensors_battery().percent
1408        self.assertAlmostEqual(acpi_value, psutil_value, delta=1)
1409
1410    @unittest.skipIf(not which("acpi"), "acpi utility not available")
1411    def test_power_plugged(self):
1412        out = sh("acpi -b")
1413        if 'unknown' in out.lower():
1414            return unittest.skip("acpi output not reliable")
1415        if 'discharging at zero rate' in out:
1416            plugged = True
1417        else:
1418            plugged = "Charging" in out.split('\n')[0]
1419        self.assertEqual(psutil.sensors_battery().power_plugged, plugged)
1420
1421    def test_emulate_power_plugged(self):
1422        # Pretend the AC power cable is connected.
1423        def open_mock(name, *args, **kwargs):
1424            if name.endswith("AC0/online") or name.endswith("AC/online"):
1425                return io.BytesIO(b"1")
1426            else:
1427                return orig_open(name, *args, **kwargs)
1428
1429        orig_open = open
1430        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1431        with mock.patch(patch_point, side_effect=open_mock) as m:
1432            self.assertEqual(psutil.sensors_battery().power_plugged, True)
1433            self.assertEqual(
1434                psutil.sensors_battery().secsleft, psutil.POWER_TIME_UNLIMITED)
1435            assert m.called
1436
1437    def test_emulate_power_plugged_2(self):
1438        # Same as above but pretend /AC0/online does not exist in which
1439        # case code relies on /status file.
1440        def open_mock(name, *args, **kwargs):
1441            if name.endswith("AC0/online") or name.endswith("AC/online"):
1442                raise IOError(errno.ENOENT, "")
1443            elif name.endswith("/status"):
1444                return io.StringIO(u("charging"))
1445            else:
1446                return orig_open(name, *args, **kwargs)
1447
1448        orig_open = open
1449        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1450        with mock.patch(patch_point, side_effect=open_mock) as m:
1451            self.assertEqual(psutil.sensors_battery().power_plugged, True)
1452            assert m.called
1453
1454    def test_emulate_power_not_plugged(self):
1455        # Pretend the AC power cable is not connected.
1456        def open_mock(name, *args, **kwargs):
1457            if name.endswith("AC0/online") or name.endswith("AC/online"):
1458                return io.BytesIO(b"0")
1459            else:
1460                return orig_open(name, *args, **kwargs)
1461
1462        orig_open = open
1463        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1464        with mock.patch(patch_point, side_effect=open_mock) as m:
1465            self.assertEqual(psutil.sensors_battery().power_plugged, False)
1466            assert m.called
1467
1468    def test_emulate_power_not_plugged_2(self):
1469        # Same as above but pretend /AC0/online does not exist in which
1470        # case code relies on /status file.
1471        def open_mock(name, *args, **kwargs):
1472            if name.endswith("AC0/online") or name.endswith("AC/online"):
1473                raise IOError(errno.ENOENT, "")
1474            elif name.endswith("/status"):
1475                return io.StringIO(u("discharging"))
1476            else:
1477                return orig_open(name, *args, **kwargs)
1478
1479        orig_open = open
1480        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1481        with mock.patch(patch_point, side_effect=open_mock) as m:
1482            self.assertEqual(psutil.sensors_battery().power_plugged, False)
1483            assert m.called
1484
1485    def test_emulate_power_undetermined(self):
1486        # Pretend we can't know whether the AC power cable not
1487        # connected (assert fallback to False).
1488        def open_mock(name, *args, **kwargs):
1489            if name.startswith("/sys/class/power_supply/AC0/online") or \
1490                    name.startswith("/sys/class/power_supply/AC/online"):
1491                raise IOError(errno.ENOENT, "")
1492            elif name.startswith("/sys/class/power_supply/BAT0/status"):
1493                return io.BytesIO(b"???")
1494            else:
1495                return orig_open(name, *args, **kwargs)
1496
1497        orig_open = open
1498        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1499        with mock.patch(patch_point, side_effect=open_mock) as m:
1500            self.assertIsNone(psutil.sensors_battery().power_plugged)
1501            assert m.called
1502
1503    def test_emulate_no_base_files(self):
1504        # Emulate a case where base metrics files are not present,
1505        # in which case we're supposed to get None.
1506        with mock_open_exception(
1507                "/sys/class/power_supply/BAT0/energy_now",
1508                IOError(errno.ENOENT, "")):
1509            with mock_open_exception(
1510                    "/sys/class/power_supply/BAT0/charge_now",
1511                    IOError(errno.ENOENT, "")):
1512                self.assertIsNone(psutil.sensors_battery())
1513
1514    def test_emulate_energy_full_0(self):
1515        # Emulate a case where energy_full files returns 0.
1516        with mock_open_content(
1517                "/sys/class/power_supply/BAT0/energy_full", b"0") as m:
1518            self.assertEqual(psutil.sensors_battery().percent, 0)
1519            assert m.called
1520
1521    def test_emulate_energy_full_not_avail(self):
1522        # Emulate a case where energy_full file does not exist.
1523        # Expected fallback on /capacity.
1524        with mock_open_exception(
1525                "/sys/class/power_supply/BAT0/energy_full",
1526                IOError(errno.ENOENT, "")):
1527            with mock_open_exception(
1528                    "/sys/class/power_supply/BAT0/charge_full",
1529                    IOError(errno.ENOENT, "")):
1530                with mock_open_content(
1531                        "/sys/class/power_supply/BAT0/capacity", b"88"):
1532                    self.assertEqual(psutil.sensors_battery().percent, 88)
1533
1534    def test_emulate_no_power(self):
1535        # Emulate a case where /AC0/online file nor /BAT0/status exist.
1536        with mock_open_exception(
1537                "/sys/class/power_supply/AC/online",
1538                IOError(errno.ENOENT, "")):
1539            with mock_open_exception(
1540                    "/sys/class/power_supply/AC0/online",
1541                    IOError(errno.ENOENT, "")):
1542                with mock_open_exception(
1543                        "/sys/class/power_supply/BAT0/status",
1544                        IOError(errno.ENOENT, "")):
1545                    self.assertIsNone(psutil.sensors_battery().power_plugged)
1546
1547
1548@unittest.skipIf(not LINUX, "LINUX only")
1549class TestSensorsTemperatures(unittest.TestCase):
1550
1551    def test_emulate_class_hwmon(self):
1552        def open_mock(name, *args, **kwargs):
1553            if name.endswith('/name'):
1554                return io.StringIO(u("name"))
1555            elif name.endswith('/temp1_label'):
1556                return io.StringIO(u("label"))
1557            elif name.endswith('/temp1_input'):
1558                return io.BytesIO(b"30000")
1559            elif name.endswith('/temp1_max'):
1560                return io.BytesIO(b"40000")
1561            elif name.endswith('/temp1_crit'):
1562                return io.BytesIO(b"50000")
1563            else:
1564                return orig_open(name, *args, **kwargs)
1565
1566        orig_open = open
1567        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1568        with mock.patch(patch_point, side_effect=open_mock):
1569            # Test case with /sys/class/hwmon
1570            with mock.patch('glob.glob',
1571                            return_value=['/sys/class/hwmon/hwmon0/temp1']):
1572                temp = psutil.sensors_temperatures()['name'][0]
1573                self.assertEqual(temp.label, 'label')
1574                self.assertEqual(temp.current, 30.0)
1575                self.assertEqual(temp.high, 40.0)
1576                self.assertEqual(temp.critical, 50.0)
1577
1578    def test_emulate_class_thermal(self):
1579        def open_mock(name, *args, **kwargs):
1580            if name.endswith('0_temp'):
1581                return io.BytesIO(b"50000")
1582            elif name.endswith('temp'):
1583                return io.BytesIO(b"30000")
1584            elif name.endswith('0_type'):
1585                return io.StringIO(u("critical"))
1586            elif name.endswith('type'):
1587                return io.StringIO(u("name"))
1588            else:
1589                return orig_open(name, *args, **kwargs)
1590
1591        def glob_mock(path):
1592            if path == '/sys/class/hwmon/hwmon*/temp*_*':
1593                return []
1594            elif path == '/sys/class/hwmon/hwmon*/device/temp*_*':
1595                return []
1596            elif path == '/sys/class/thermal/thermal_zone*':
1597                return ['/sys/class/thermal/thermal_zone0']
1598            elif path == '/sys/class/thermal/thermal_zone0/trip_point*':
1599                return ['/sys/class/thermal/thermal_zone1/trip_point_0_type',
1600                        '/sys/class/thermal/thermal_zone1/trip_point_0_temp']
1601            return []
1602
1603        orig_open = open
1604        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1605        with mock.patch(patch_point, side_effect=open_mock):
1606            with mock.patch('glob.glob', create=True, side_effect=glob_mock):
1607                temp = psutil.sensors_temperatures()['name'][0]
1608                self.assertEqual(temp.label, '')
1609                self.assertEqual(temp.current, 30.0)
1610                self.assertEqual(temp.high, 50.0)
1611                self.assertEqual(temp.critical, 50.0)
1612
1613
1614@unittest.skipIf(not LINUX, "LINUX only")
1615class TestSensorsFans(unittest.TestCase):
1616
1617    def test_emulate_data(self):
1618        def open_mock(name, *args, **kwargs):
1619            if name.endswith('/name'):
1620                return io.StringIO(u("name"))
1621            elif name.endswith('/fan1_label'):
1622                return io.StringIO(u("label"))
1623            elif name.endswith('/fan1_input'):
1624                return io.StringIO(u("2000"))
1625            else:
1626                return orig_open(name, *args, **kwargs)
1627
1628        orig_open = open
1629        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1630        with mock.patch(patch_point, side_effect=open_mock):
1631            with mock.patch('glob.glob',
1632                            return_value=['/sys/class/hwmon/hwmon2/fan1']):
1633                fan = psutil.sensors_fans()['name'][0]
1634                self.assertEqual(fan.label, 'label')
1635                self.assertEqual(fan.current, 2000)
1636
1637
1638# =====================================================================
1639# --- test process
1640# =====================================================================
1641
1642
1643@unittest.skipIf(not LINUX, "LINUX only")
1644class TestProcess(unittest.TestCase):
1645
1646    def setUp(self):
1647        safe_rmpath(TESTFN)
1648
1649    tearDown = setUp
1650
1651    def test_memory_full_info(self):
1652        src = textwrap.dedent("""
1653            import time
1654            with open("%s", "w") as f:
1655                time.sleep(10)
1656            """ % TESTFN)
1657        sproc = pyrun(src)
1658        self.addCleanup(reap_children)
1659        call_until(lambda: os.listdir('.'), "'%s' not in ret" % TESTFN)
1660        p = psutil.Process(sproc.pid)
1661        time.sleep(.1)
1662        mem = p.memory_full_info()
1663        maps = p.memory_maps(grouped=False)
1664        self.assertAlmostEqual(
1665            mem.uss, sum([x.private_dirty + x.private_clean for x in maps]),
1666            delta=4096)
1667        self.assertAlmostEqual(
1668            mem.pss, sum([x.pss for x in maps]), delta=4096)
1669        self.assertAlmostEqual(
1670            mem.swap, sum([x.swap for x in maps]), delta=4096)
1671
1672    def test_memory_full_info_mocked(self):
1673        # See: https://github.com/giampaolo/psutil/issues/1222
1674        with mock_open_content(
1675            "/proc/%s/smaps" % os.getpid(),
1676            textwrap.dedent("""\
1677                fffff0 r-xp 00000000 00:00 0                  [vsyscall]
1678                Size:                  1 kB
1679                Rss:                   2 kB
1680                Pss:                   3 kB
1681                Shared_Clean:          4 kB
1682                Shared_Dirty:          5 kB
1683                Private_Clean:         6 kB
1684                Private_Dirty:         7 kB
1685                Referenced:            8 kB
1686                Anonymous:             9 kB
1687                LazyFree:              10 kB
1688                AnonHugePages:         11 kB
1689                ShmemPmdMapped:        12 kB
1690                Shared_Hugetlb:        13 kB
1691                Private_Hugetlb:       14 kB
1692                Swap:                  15 kB
1693                SwapPss:               16 kB
1694                KernelPageSize:        17 kB
1695                MMUPageSize:           18 kB
1696                Locked:                19 kB
1697                VmFlags: rd ex
1698                """).encode()) as m:
1699            p = psutil.Process()
1700            mem = p.memory_full_info()
1701            assert m.called
1702            self.assertEqual(mem.uss, (6 + 7 + 14) * 1024)
1703            self.assertEqual(mem.pss, 3 * 1024)
1704            self.assertEqual(mem.swap, 15 * 1024)
1705
1706    # On PYPY file descriptors are not closed fast enough.
1707    @unittest.skipIf(PYPY, "unreliable on PYPY")
1708    def test_open_files_mode(self):
1709        def get_test_file():
1710            p = psutil.Process()
1711            giveup_at = time.time() + 2
1712            while True:
1713                for file in p.open_files():
1714                    if file.path == os.path.abspath(TESTFN):
1715                        return file
1716                    elif time.time() > giveup_at:
1717                        break
1718            raise RuntimeError("timeout looking for test file")
1719
1720        #
1721        with open(TESTFN, "w"):
1722            self.assertEqual(get_test_file().mode, "w")
1723        with open(TESTFN, "r"):
1724            self.assertEqual(get_test_file().mode, "r")
1725        with open(TESTFN, "a"):
1726            self.assertEqual(get_test_file().mode, "a")
1727        #
1728        with open(TESTFN, "r+"):
1729            self.assertEqual(get_test_file().mode, "r+")
1730        with open(TESTFN, "w+"):
1731            self.assertEqual(get_test_file().mode, "r+")
1732        with open(TESTFN, "a+"):
1733            self.assertEqual(get_test_file().mode, "a+")
1734        # note: "x" bit is not supported
1735        if PY3:
1736            safe_rmpath(TESTFN)
1737            with open(TESTFN, "x"):
1738                self.assertEqual(get_test_file().mode, "w")
1739            safe_rmpath(TESTFN)
1740            with open(TESTFN, "x+"):
1741                self.assertEqual(get_test_file().mode, "r+")
1742
1743    def test_open_files_file_gone(self):
1744        # simulates a file which gets deleted during open_files()
1745        # execution
1746        p = psutil.Process()
1747        files = p.open_files()
1748        with tempfile.NamedTemporaryFile():
1749            # give the kernel some time to see the new file
1750            call_until(p.open_files, "len(ret) != %i" % len(files))
1751            with mock.patch('psutil._pslinux.os.readlink',
1752                            side_effect=OSError(errno.ENOENT, "")) as m:
1753                files = p.open_files()
1754                assert not files
1755                assert m.called
1756            # also simulate the case where os.readlink() returns EINVAL
1757            # in which case psutil is supposed to 'continue'
1758            with mock.patch('psutil._pslinux.os.readlink',
1759                            side_effect=OSError(errno.EINVAL, "")) as m:
1760                self.assertEqual(p.open_files(), [])
1761                assert m.called
1762
1763    def test_open_files_fd_gone(self):
1764        # Simulate a case where /proc/{pid}/fdinfo/{fd} disappears
1765        # while iterating through fds.
1766        # https://travis-ci.org/giampaolo/psutil/jobs/225694530
1767        p = psutil.Process()
1768        files = p.open_files()
1769        with tempfile.NamedTemporaryFile():
1770            # give the kernel some time to see the new file
1771            call_until(p.open_files, "len(ret) != %i" % len(files))
1772            patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1773            with mock.patch(patch_point,
1774                            side_effect=IOError(errno.ENOENT, "")) as m:
1775                files = p.open_files()
1776                assert not files
1777                assert m.called
1778
1779    # --- mocked tests
1780
1781    def test_terminal_mocked(self):
1782        with mock.patch('psutil._pslinux._psposix.get_terminal_map',
1783                        return_value={}) as m:
1784            self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
1785            assert m.called
1786
1787    # TODO: re-enable this test.
1788    # def test_num_ctx_switches_mocked(self):
1789    #     with mock.patch('psutil._common.open', create=True) as m:
1790    #         self.assertRaises(
1791    #             NotImplementedError,
1792    #             psutil._pslinux.Process(os.getpid()).num_ctx_switches)
1793    #         assert m.called
1794
1795    def test_cmdline_mocked(self):
1796        # see: https://github.com/giampaolo/psutil/issues/639
1797        p = psutil.Process()
1798        fake_file = io.StringIO(u('foo\x00bar\x00'))
1799        with mock.patch('psutil._common.open',
1800                        return_value=fake_file, create=True) as m:
1801            self.assertEqual(p.cmdline(), ['foo', 'bar'])
1802            assert m.called
1803        fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
1804        with mock.patch('psutil._common.open',
1805                        return_value=fake_file, create=True) as m:
1806            self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1807            assert m.called
1808
1809    def test_cmdline_spaces_mocked(self):
1810        # see: https://github.com/giampaolo/psutil/issues/1179
1811        p = psutil.Process()
1812        fake_file = io.StringIO(u('foo bar '))
1813        with mock.patch('psutil._common.open',
1814                        return_value=fake_file, create=True) as m:
1815            self.assertEqual(p.cmdline(), ['foo', 'bar'])
1816            assert m.called
1817        fake_file = io.StringIO(u('foo bar  '))
1818        with mock.patch('psutil._common.open',
1819                        return_value=fake_file, create=True) as m:
1820            self.assertEqual(p.cmdline(), ['foo', 'bar', ''])
1821            assert m.called
1822
1823    def test_cmdline_mixed_separators(self):
1824        # https://github.com/giampaolo/psutil/issues/
1825        #    1179#issuecomment-552984549
1826        p = psutil.Process()
1827        fake_file = io.StringIO(u('foo\x20bar\x00'))
1828        with mock.patch('psutil._common.open',
1829                        return_value=fake_file, create=True) as m:
1830            self.assertEqual(p.cmdline(), ['foo', 'bar'])
1831            assert m.called
1832
1833    def test_readlink_path_deleted_mocked(self):
1834        with mock.patch('psutil._pslinux.os.readlink',
1835                        return_value='/home/foo (deleted)'):
1836            self.assertEqual(psutil.Process().exe(), "/home/foo")
1837            self.assertEqual(psutil.Process().cwd(), "/home/foo")
1838
1839    def test_threads_mocked(self):
1840        # Test the case where os.listdir() returns a file (thread)
1841        # which no longer exists by the time we open() it (race
1842        # condition). threads() is supposed to ignore that instead
1843        # of raising NSP.
1844        def open_mock(name, *args, **kwargs):
1845            if name.startswith('/proc/%s/task' % os.getpid()):
1846                raise IOError(errno.ENOENT, "")
1847            else:
1848                return orig_open(name, *args, **kwargs)
1849
1850        orig_open = open
1851        patch_point = 'builtins.open' if PY3 else '__builtin__.open'
1852        with mock.patch(patch_point, side_effect=open_mock) as m:
1853            ret = psutil.Process().threads()
1854            assert m.called
1855            self.assertEqual(ret, [])
1856
1857        # ...but if it bumps into something != ENOENT we want an
1858        # exception.
1859        def open_mock(name, *args, **kwargs):
1860            if name.startswith('/proc/%s/task' % os.getpid()):
1861                raise IOError(errno.EPERM, "")
1862            else:
1863                return orig_open(name, *args, **kwargs)
1864
1865        with mock.patch(patch_point, side_effect=open_mock):
1866            self.assertRaises(psutil.AccessDenied, psutil.Process().threads)
1867
1868    def test_exe_mocked(self):
1869        with mock.patch('psutil._pslinux.readlink',
1870                        side_effect=OSError(errno.ENOENT, "")) as m1:
1871            with mock.patch('psutil.Process.cmdline',
1872                            side_effect=psutil.AccessDenied(0, "")) as m2:
1873                # No such file error; might be raised also if /proc/pid/exe
1874                # path actually exists for system processes with low pids
1875                # (about 0-20). In this case psutil is supposed to return
1876                # an empty string.
1877                ret = psutil.Process().exe()
1878                assert m1.called
1879                assert m2.called
1880                self.assertEqual(ret, "")
1881
1882                # ...but if /proc/pid no longer exist we're supposed to treat
1883                # it as an alias for zombie process
1884                with mock.patch('psutil._pslinux.os.path.lexists',
1885                                return_value=False):
1886                    self.assertRaises(
1887                        psutil.ZombieProcess, psutil.Process().exe)
1888
1889    def test_issue_1014(self):
1890        # Emulates a case where smaps file does not exist. In this case
1891        # wrap_exception decorator should not raise NoSuchProcess.
1892        with mock_open_exception(
1893                '/proc/%s/smaps' % os.getpid(),
1894                IOError(errno.ENOENT, "")) as m:
1895            p = psutil.Process()
1896            with self.assertRaises(FileNotFoundError):
1897                p.memory_maps()
1898            assert m.called
1899
1900    @unittest.skipIf(not HAS_RLIMIT, "not supported")
1901    def test_rlimit_zombie(self):
1902        # Emulate a case where rlimit() raises ENOSYS, which may
1903        # happen in case of zombie process:
1904        # https://travis-ci.org/giampaolo/psutil/jobs/51368273
1905        with mock.patch("psutil._pslinux.cext.linux_prlimit",
1906                        side_effect=OSError(errno.ENOSYS, "")) as m:
1907            p = psutil.Process()
1908            p.name()
1909            with self.assertRaises(psutil.ZombieProcess) as exc:
1910                p.rlimit(psutil.RLIMIT_NOFILE)
1911            assert m.called
1912        self.assertEqual(exc.exception.pid, p.pid)
1913        self.assertEqual(exc.exception.name, p.name())
1914
1915    def test_cwd_zombie(self):
1916        with mock.patch("psutil._pslinux.os.readlink",
1917                        side_effect=OSError(errno.ENOENT, "")) as m:
1918            p = psutil.Process()
1919            p.name()
1920            with self.assertRaises(psutil.ZombieProcess) as exc:
1921                p.cwd()
1922            assert m.called
1923        self.assertEqual(exc.exception.pid, p.pid)
1924        self.assertEqual(exc.exception.name, p.name())
1925
1926    def test_stat_file_parsing(self):
1927        from psutil._pslinux import CLOCK_TICKS
1928
1929        args = [
1930            "0",      # pid
1931            "(cat)",  # name
1932            "Z",      # status
1933            "1",      # ppid
1934            "0",      # pgrp
1935            "0",      # session
1936            "0",      # tty
1937            "0",      # tpgid
1938            "0",      # flags
1939            "0",      # minflt
1940            "0",      # cminflt
1941            "0",      # majflt
1942            "0",      # cmajflt
1943            "2",      # utime
1944            "3",      # stime
1945            "4",      # cutime
1946            "5",      # cstime
1947            "0",      # priority
1948            "0",      # nice
1949            "0",      # num_threads
1950            "0",      # itrealvalue
1951            "6",      # starttime
1952            "0",      # vsize
1953            "0",      # rss
1954            "0",      # rsslim
1955            "0",      # startcode
1956            "0",      # endcode
1957            "0",      # startstack
1958            "0",      # kstkesp
1959            "0",      # kstkeip
1960            "0",      # signal
1961            "0",      # blocked
1962            "0",      # sigignore
1963            "0",      # sigcatch
1964            "0",      # wchan
1965            "0",      # nswap
1966            "0",      # cnswap
1967            "0",      # exit_signal
1968            "6",      # processor
1969            "0",      # rt priority
1970            "0",      # policy
1971            "7",      # delayacct_blkio_ticks
1972        ]
1973        content = " ".join(args).encode()
1974        with mock_open_content('/proc/%s/stat' % os.getpid(), content):
1975            p = psutil.Process()
1976            self.assertEqual(p.name(), 'cat')
1977            self.assertEqual(p.status(), psutil.STATUS_ZOMBIE)
1978            self.assertEqual(p.ppid(), 1)
1979            self.assertEqual(
1980                p.create_time(), 6 / CLOCK_TICKS + psutil.boot_time())
1981            cpu = p.cpu_times()
1982            self.assertEqual(cpu.user, 2 / CLOCK_TICKS)
1983            self.assertEqual(cpu.system, 3 / CLOCK_TICKS)
1984            self.assertEqual(cpu.children_user, 4 / CLOCK_TICKS)
1985            self.assertEqual(cpu.children_system, 5 / CLOCK_TICKS)
1986            self.assertEqual(cpu.iowait, 7 / CLOCK_TICKS)
1987            self.assertEqual(p.cpu_num(), 6)
1988
1989    def test_status_file_parsing(self):
1990        with mock_open_content(
1991            '/proc/%s/status' % os.getpid(),
1992            textwrap.dedent("""\
1993                Uid:\t1000\t1001\t1002\t1003
1994                Gid:\t1004\t1005\t1006\t1007
1995                Threads:\t66
1996                Cpus_allowed:\tf
1997                Cpus_allowed_list:\t0-7
1998                voluntary_ctxt_switches:\t12
1999                nonvoluntary_ctxt_switches:\t13""").encode()):
2000            p = psutil.Process()
2001            self.assertEqual(p.num_ctx_switches().voluntary, 12)
2002            self.assertEqual(p.num_ctx_switches().involuntary, 13)
2003            self.assertEqual(p.num_threads(), 66)
2004            uids = p.uids()
2005            self.assertEqual(uids.real, 1000)
2006            self.assertEqual(uids.effective, 1001)
2007            self.assertEqual(uids.saved, 1002)
2008            gids = p.gids()
2009            self.assertEqual(gids.real, 1004)
2010            self.assertEqual(gids.effective, 1005)
2011            self.assertEqual(gids.saved, 1006)
2012            self.assertEqual(p._proc._get_eligible_cpus(), list(range(0, 8)))
2013
2014
2015@unittest.skipIf(not LINUX, "LINUX only")
2016class TestProcessAgainstStatus(unittest.TestCase):
2017    """/proc/pid/stat and /proc/pid/status have many values in common.
2018    Whenever possible, psutil uses /proc/pid/stat (it's faster).
2019    For all those cases we check that the value found in
2020    /proc/pid/stat (by psutil) matches the one found in
2021    /proc/pid/status.
2022    """
2023
2024    @classmethod
2025    def setUpClass(cls):
2026        cls.proc = psutil.Process()
2027
2028    def read_status_file(self, linestart):
2029        with psutil._psplatform.open_text(
2030                '/proc/%s/status' % self.proc.pid) as f:
2031            for line in f:
2032                line = line.strip()
2033                if line.startswith(linestart):
2034                    value = line.partition('\t')[2]
2035                    try:
2036                        return int(value)
2037                    except ValueError:
2038                        return value
2039            raise ValueError("can't find %r" % linestart)
2040
2041    def test_name(self):
2042        value = self.read_status_file("Name:")
2043        self.assertEqual(self.proc.name(), value)
2044
2045    def test_status(self):
2046        value = self.read_status_file("State:")
2047        value = value[value.find('(') + 1:value.rfind(')')]
2048        value = value.replace(' ', '-')
2049        self.assertEqual(self.proc.status(), value)
2050
2051    def test_ppid(self):
2052        value = self.read_status_file("PPid:")
2053        self.assertEqual(self.proc.ppid(), value)
2054
2055    def test_num_threads(self):
2056        value = self.read_status_file("Threads:")
2057        self.assertEqual(self.proc.num_threads(), value)
2058
2059    def test_uids(self):
2060        value = self.read_status_file("Uid:")
2061        value = tuple(map(int, value.split()[1:4]))
2062        self.assertEqual(self.proc.uids(), value)
2063
2064    def test_gids(self):
2065        value = self.read_status_file("Gid:")
2066        value = tuple(map(int, value.split()[1:4]))
2067        self.assertEqual(self.proc.gids(), value)
2068
2069    @retry_on_failure()
2070    def test_num_ctx_switches(self):
2071        value = self.read_status_file("voluntary_ctxt_switches:")
2072        self.assertEqual(self.proc.num_ctx_switches().voluntary, value)
2073        value = self.read_status_file("nonvoluntary_ctxt_switches:")
2074        self.assertEqual(self.proc.num_ctx_switches().involuntary, value)
2075
2076    def test_cpu_affinity(self):
2077        value = self.read_status_file("Cpus_allowed_list:")
2078        if '-' in str(value):
2079            min_, max_ = map(int, value.split('-'))
2080            self.assertEqual(
2081                self.proc.cpu_affinity(), list(range(min_, max_ + 1)))
2082
2083    def test_cpu_affinity_eligible_cpus(self):
2084        value = self.read_status_file("Cpus_allowed_list:")
2085        with mock.patch("psutil._pslinux.per_cpu_times") as m:
2086            self.proc._proc._get_eligible_cpus()
2087        if '-' in str(value):
2088            assert not m.called
2089        else:
2090            assert m.called
2091
2092
2093# =====================================================================
2094# --- test utils
2095# =====================================================================
2096
2097
2098@unittest.skipIf(not LINUX, "LINUX only")
2099class TestUtils(unittest.TestCase):
2100
2101    def test_readlink(self):
2102        with mock.patch("os.readlink", return_value="foo (deleted)") as m:
2103            self.assertEqual(psutil._psplatform.readlink("bar"), "foo")
2104            assert m.called
2105
2106    def test_cat(self):
2107        fname = os.path.abspath(TESTFN)
2108        with open(fname, "wt") as f:
2109            f.write("foo ")
2110        self.assertEqual(psutil._psplatform.cat(TESTFN, binary=False), "foo")
2111        self.assertEqual(psutil._psplatform.cat(TESTFN, binary=True), b"foo")
2112        self.assertEqual(
2113            psutil._psplatform.cat(TESTFN + '??', fallback="bar"), "bar")
2114
2115
2116if __name__ == '__main__':
2117    from psutil.tests.runner import run
2118    run(__file__)
2119