1#!/usr/bin/env python3
2
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7# TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd.
8
9
10"""Tests specific to all BSD platforms."""
11
12
13import datetime
14import os
15import re
16import time
17
18import psutil
19from psutil import BSD
20from psutil import FREEBSD
21from psutil import NETBSD
22from psutil import OPENBSD
23from psutil.tests import get_test_subprocess
24from psutil.tests import HAS_BATTERY
25from psutil.tests import MEMORY_TOLERANCE
26from psutil.tests import reap_children
27from psutil.tests import retry_on_failure
28from psutil.tests import sh
29from psutil.tests import unittest
30from psutil.tests import which
31
32
33if BSD:
34    PAGESIZE = os.sysconf("SC_PAGE_SIZE")
35    if os.getuid() == 0:  # muse requires root privileges
36        MUSE_AVAILABLE = which('muse')
37    else:
38        MUSE_AVAILABLE = False
39else:
40    MUSE_AVAILABLE = False
41
42
43def sysctl(cmdline):
44    """Expects a sysctl command with an argument and parse the result
45    returning only the value of interest.
46    """
47    result = sh("sysctl " + cmdline)
48    if FREEBSD:
49        result = result[result.find(": ") + 2:]
50    elif OPENBSD or NETBSD:
51        result = result[result.find("=") + 1:]
52    try:
53        return int(result)
54    except ValueError:
55        return result
56
57
58def muse(field):
59    """Thin wrapper around 'muse' cmdline utility."""
60    out = sh('muse')
61    for line in out.split('\n'):
62        if line.startswith(field):
63            break
64    else:
65        raise ValueError("line not found")
66    return int(line.split()[1])
67
68
69# =====================================================================
70# --- All BSD*
71# =====================================================================
72
73
74@unittest.skipIf(not BSD, "BSD only")
75class BSDTestCase(unittest.TestCase):
76    """Generic tests common to all BSD variants."""
77
78    @classmethod
79    def setUpClass(cls):
80        cls.pid = get_test_subprocess().pid
81
82    @classmethod
83    def tearDownClass(cls):
84        reap_children()
85
86    @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD")
87    def test_process_create_time(self):
88        output = sh("ps -o lstart -p %s" % self.pid)
89        start_ps = output.replace('STARTED', '').strip()
90        start_psutil = psutil.Process(self.pid).create_time()
91        start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
92                                     time.localtime(start_psutil))
93        self.assertEqual(start_ps, start_psutil)
94
95    def test_disks(self):
96        # test psutil.disk_usage() and psutil.disk_partitions()
97        # against "df -a"
98        def df(path):
99            out = sh('df -k "%s"' % path).strip()
100            lines = out.split('\n')
101            lines.pop(0)
102            line = lines.pop(0)
103            dev, total, used, free = line.split()[:4]
104            if dev == 'none':
105                dev = ''
106            total = int(total) * 1024
107            used = int(used) * 1024
108            free = int(free) * 1024
109            return dev, total, used, free
110
111        for part in psutil.disk_partitions(all=False):
112            usage = psutil.disk_usage(part.mountpoint)
113            dev, total, used, free = df(part.mountpoint)
114            self.assertEqual(part.device, dev)
115            self.assertEqual(usage.total, total)
116            # 10 MB tollerance
117            if abs(usage.free - free) > 10 * 1024 * 1024:
118                self.fail("psutil=%s, df=%s" % (usage.free, free))
119            if abs(usage.used - used) > 10 * 1024 * 1024:
120                self.fail("psutil=%s, df=%s" % (usage.used, used))
121
122    @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
123    def test_cpu_count_logical(self):
124        syst = sysctl("hw.ncpu")
125        self.assertEqual(psutil.cpu_count(logical=True), syst)
126
127    @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
128    def test_virtual_memory_total(self):
129        num = sysctl('hw.physmem')
130        self.assertEqual(num, psutil.virtual_memory().total)
131
132    def test_net_if_stats(self):
133        for name, stats in psutil.net_if_stats().items():
134            try:
135                out = sh("ifconfig %s" % name)
136            except RuntimeError:
137                pass
138            else:
139                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
140                if "mtu" in out:
141                    self.assertEqual(stats.mtu,
142                                     int(re.findall(r'mtu (\d+)', out)[0]))
143
144
145# =====================================================================
146# --- FreeBSD
147# =====================================================================
148
149
150@unittest.skipIf(not FREEBSD, "FREEBSD only")
151class FreeBSDProcessTestCase(unittest.TestCase):
152
153    @classmethod
154    def setUpClass(cls):
155        cls.pid = get_test_subprocess().pid
156
157    @classmethod
158    def tearDownClass(cls):
159        reap_children()
160
161    @retry_on_failure()
162    def test_memory_maps(self):
163        out = sh('procstat -v %s' % self.pid)
164        maps = psutil.Process(self.pid).memory_maps(grouped=False)
165        lines = out.split('\n')[1:]
166        while lines:
167            line = lines.pop()
168            fields = line.split()
169            _, start, stop, perms, res = fields[:5]
170            map = maps.pop()
171            self.assertEqual("%s-%s" % (start, stop), map.addr)
172            self.assertEqual(int(res), map.rss)
173            if not map.path.startswith('['):
174                self.assertEqual(fields[10], map.path)
175
176    def test_exe(self):
177        out = sh('procstat -b %s' % self.pid)
178        self.assertEqual(psutil.Process(self.pid).exe(),
179                         out.split('\n')[1].split()[-1])
180
181    def test_cmdline(self):
182        out = sh('procstat -c %s' % self.pid)
183        self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
184                         ' '.join(out.split('\n')[1].split()[2:]))
185
186    def test_uids_gids(self):
187        out = sh('procstat -s %s' % self.pid)
188        euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
189        p = psutil.Process(self.pid)
190        uids = p.uids()
191        gids = p.gids()
192        self.assertEqual(uids.real, int(ruid))
193        self.assertEqual(uids.effective, int(euid))
194        self.assertEqual(uids.saved, int(suid))
195        self.assertEqual(gids.real, int(rgid))
196        self.assertEqual(gids.effective, int(egid))
197        self.assertEqual(gids.saved, int(sgid))
198
199    @retry_on_failure()
200    def test_ctx_switches(self):
201        tested = []
202        out = sh('procstat -r %s' % self.pid)
203        p = psutil.Process(self.pid)
204        for line in out.split('\n'):
205            line = line.lower().strip()
206            if ' voluntary context' in line:
207                pstat_value = int(line.split()[-1])
208                psutil_value = p.num_ctx_switches().voluntary
209                self.assertEqual(pstat_value, psutil_value)
210                tested.append(None)
211            elif ' involuntary context' in line:
212                pstat_value = int(line.split()[-1])
213                psutil_value = p.num_ctx_switches().involuntary
214                self.assertEqual(pstat_value, psutil_value)
215                tested.append(None)
216        if len(tested) != 2:
217            raise RuntimeError("couldn't find lines match in procstat out")
218
219    @retry_on_failure()
220    def test_cpu_times(self):
221        tested = []
222        out = sh('procstat -r %s' % self.pid)
223        p = psutil.Process(self.pid)
224        for line in out.split('\n'):
225            line = line.lower().strip()
226            if 'user time' in line:
227                pstat_value = float('0.' + line.split()[-1].split('.')[-1])
228                psutil_value = p.cpu_times().user
229                self.assertEqual(pstat_value, psutil_value)
230                tested.append(None)
231            elif 'system time' in line:
232                pstat_value = float('0.' + line.split()[-1].split('.')[-1])
233                psutil_value = p.cpu_times().system
234                self.assertEqual(pstat_value, psutil_value)
235                tested.append(None)
236        if len(tested) != 2:
237            raise RuntimeError("couldn't find lines match in procstat out")
238
239
240@unittest.skipIf(not FREEBSD, "FREEBSD only")
241class FreeBSDSystemTestCase(unittest.TestCase):
242
243    @staticmethod
244    def parse_swapinfo():
245        # the last line is always the total
246        output = sh("swapinfo -k").splitlines()[-1]
247        parts = re.split(r'\s+', output)
248
249        if not parts:
250            raise ValueError("Can't parse swapinfo: %s" % output)
251
252        # the size is in 1k units, so multiply by 1024
253        total, used, free = (int(p) * 1024 for p in parts[1:4])
254        return total, used, free
255
256    def test_cpu_frequency_against_sysctl(self):
257        # Currently only cpu 0 is frequency is supported in FreeBSD
258        # All other cores use the same frequency.
259        sensor = "dev.cpu.0.freq"
260        try:
261            sysctl_result = int(sysctl(sensor))
262        except RuntimeError:
263            self.skipTest("frequencies not supported by kernel")
264        self.assertEqual(psutil.cpu_freq().current, sysctl_result)
265
266        sensor = "dev.cpu.0.freq_levels"
267        sysctl_result = sysctl(sensor)
268        # sysctl returns a string of the format:
269        # <freq_level_1>/<voltage_level_1> <freq_level_2>/<voltage_level_2>...
270        # Ordered highest available to lowest available.
271        max_freq = int(sysctl_result.split()[0].split("/")[0])
272        min_freq = int(sysctl_result.split()[-1].split("/")[0])
273        self.assertEqual(psutil.cpu_freq().max, max_freq)
274        self.assertEqual(psutil.cpu_freq().min, min_freq)
275
276    # --- virtual_memory(); tests against sysctl
277
278    @retry_on_failure()
279    def test_vmem_active(self):
280        syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
281        self.assertAlmostEqual(psutil.virtual_memory().active, syst,
282                               delta=MEMORY_TOLERANCE)
283
284    @retry_on_failure()
285    def test_vmem_inactive(self):
286        syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
287        self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
288                               delta=MEMORY_TOLERANCE)
289
290    @retry_on_failure()
291    def test_vmem_wired(self):
292        syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
293        self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
294                               delta=MEMORY_TOLERANCE)
295
296    @retry_on_failure()
297    def test_vmem_cached(self):
298        syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
299        self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
300                               delta=MEMORY_TOLERANCE)
301
302    @retry_on_failure()
303    def test_vmem_free(self):
304        syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
305        self.assertAlmostEqual(psutil.virtual_memory().free, syst,
306                               delta=MEMORY_TOLERANCE)
307
308    @retry_on_failure()
309    def test_vmem_buffers(self):
310        syst = sysctl("vfs.bufspace")
311        self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
312                               delta=MEMORY_TOLERANCE)
313
314    # --- virtual_memory(); tests against muse
315
316    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
317    def test_muse_vmem_total(self):
318        num = muse('Total')
319        self.assertEqual(psutil.virtual_memory().total, num)
320
321    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
322    @retry_on_failure()
323    def test_muse_vmem_active(self):
324        num = muse('Active')
325        self.assertAlmostEqual(psutil.virtual_memory().active, num,
326                               delta=MEMORY_TOLERANCE)
327
328    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
329    @retry_on_failure()
330    def test_muse_vmem_inactive(self):
331        num = muse('Inactive')
332        self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
333                               delta=MEMORY_TOLERANCE)
334
335    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
336    @retry_on_failure()
337    def test_muse_vmem_wired(self):
338        num = muse('Wired')
339        self.assertAlmostEqual(psutil.virtual_memory().wired, num,
340                               delta=MEMORY_TOLERANCE)
341
342    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
343    @retry_on_failure()
344    def test_muse_vmem_cached(self):
345        num = muse('Cache')
346        self.assertAlmostEqual(psutil.virtual_memory().cached, num,
347                               delta=MEMORY_TOLERANCE)
348
349    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
350    @retry_on_failure()
351    def test_muse_vmem_free(self):
352        num = muse('Free')
353        self.assertAlmostEqual(psutil.virtual_memory().free, num,
354                               delta=MEMORY_TOLERANCE)
355
356    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
357    @retry_on_failure()
358    def test_muse_vmem_buffers(self):
359        num = muse('Buffer')
360        self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
361                               delta=MEMORY_TOLERANCE)
362
363    def test_cpu_stats_ctx_switches(self):
364        self.assertAlmostEqual(psutil.cpu_stats().ctx_switches,
365                               sysctl('vm.stats.sys.v_swtch'), delta=1000)
366
367    def test_cpu_stats_interrupts(self):
368        self.assertAlmostEqual(psutil.cpu_stats().interrupts,
369                               sysctl('vm.stats.sys.v_intr'), delta=1000)
370
371    def test_cpu_stats_soft_interrupts(self):
372        self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts,
373                               sysctl('vm.stats.sys.v_soft'), delta=1000)
374
375    @retry_on_failure()
376    def test_cpu_stats_syscalls(self):
377        # pretty high tolerance but it looks like it's OK.
378        self.assertAlmostEqual(psutil.cpu_stats().syscalls,
379                               sysctl('vm.stats.sys.v_syscall'), delta=200000)
380
381    # def test_cpu_stats_traps(self):
382    #    self.assertAlmostEqual(psutil.cpu_stats().traps,
383    #                           sysctl('vm.stats.sys.v_trap'), delta=1000)
384
385    # --- swap memory
386
387    def test_swapmem_free(self):
388        total, used, free = self.parse_swapinfo()
389        self.assertAlmostEqual(
390            psutil.swap_memory().free, free, delta=MEMORY_TOLERANCE)
391
392    def test_swapmem_used(self):
393        total, used, free = self.parse_swapinfo()
394        self.assertAlmostEqual(
395            psutil.swap_memory().used, used, delta=MEMORY_TOLERANCE)
396
397    def test_swapmem_total(self):
398        total, used, free = self.parse_swapinfo()
399        self.assertAlmostEqual(
400            psutil.swap_memory().total, total, delta=MEMORY_TOLERANCE)
401
402    # --- others
403
404    def test_boot_time(self):
405        s = sysctl('sysctl kern.boottime')
406        s = s[s.find(" sec = ") + 7:]
407        s = s[:s.find(',')]
408        btime = int(s)
409        self.assertEqual(btime, psutil.boot_time())
410
411    # --- sensors_battery
412
413    @unittest.skipIf(not HAS_BATTERY, "no battery")
414    def test_sensors_battery(self):
415        def secs2hours(secs):
416            m, s = divmod(secs, 60)
417            h, m = divmod(m, 60)
418            return "%d:%02d" % (h, m)
419
420        out = sh("acpiconf -i 0")
421        fields = dict([(x.split('\t')[0], x.split('\t')[-1])
422                       for x in out.split("\n")])
423        metrics = psutil.sensors_battery()
424        percent = int(fields['Remaining capacity:'].replace('%', ''))
425        remaining_time = fields['Remaining time:']
426        self.assertEqual(metrics.percent, percent)
427        if remaining_time == 'unknown':
428            self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED)
429        else:
430            self.assertEqual(secs2hours(metrics.secsleft), remaining_time)
431
432    @unittest.skipIf(not HAS_BATTERY, "no battery")
433    def test_sensors_battery_against_sysctl(self):
434        self.assertEqual(psutil.sensors_battery().percent,
435                         sysctl("hw.acpi.battery.life"))
436        self.assertEqual(psutil.sensors_battery().power_plugged,
437                         sysctl("hw.acpi.acline") == 1)
438        secsleft = psutil.sensors_battery().secsleft
439        if secsleft < 0:
440            self.assertEqual(sysctl("hw.acpi.battery.time"), -1)
441        else:
442            self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60)
443
444    @unittest.skipIf(HAS_BATTERY, "has battery")
445    def test_sensors_battery_no_battery(self):
446        # If no battery is present one of these calls is supposed
447        # to fail, see:
448        # https://github.com/giampaolo/psutil/issues/1074
449        with self.assertRaises(RuntimeError):
450            sysctl("hw.acpi.battery.life")
451            sysctl("hw.acpi.battery.time")
452            sysctl("hw.acpi.acline")
453        self.assertIsNone(psutil.sensors_battery())
454
455    # --- sensors_temperatures
456
457    def test_sensors_temperatures_against_sysctl(self):
458        num_cpus = psutil.cpu_count(True)
459        for cpu in range(num_cpus):
460            sensor = "dev.cpu.%s.temperature" % cpu
461            # sysctl returns a string in the format 46.0C
462            try:
463                sysctl_result = int(float(sysctl(sensor)[:-1]))
464            except RuntimeError:
465                self.skipTest("temperatures not supported by kernel")
466            self.assertAlmostEqual(
467                psutil.sensors_temperatures()["coretemp"][cpu].current,
468                sysctl_result, delta=10)
469
470            sensor = "dev.cpu.%s.coretemp.tjmax" % cpu
471            sysctl_result = int(float(sysctl(sensor)[:-1]))
472            self.assertEqual(
473                psutil.sensors_temperatures()["coretemp"][cpu].high,
474                sysctl_result)
475
476# =====================================================================
477# --- OpenBSD
478# =====================================================================
479
480
481@unittest.skipIf(not OPENBSD, "OPENBSD only")
482class OpenBSDTestCase(unittest.TestCase):
483
484    def test_boot_time(self):
485        s = sysctl('kern.boottime')
486        sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
487        psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time())
488        self.assertEqual(sys_bt, psutil_bt)
489
490
491# =====================================================================
492# --- NetBSD
493# =====================================================================
494
495
496@unittest.skipIf(not NETBSD, "NETBSD only")
497class NetBSDTestCase(unittest.TestCase):
498
499    @staticmethod
500    def parse_meminfo(look_for):
501        with open('/proc/meminfo', 'rt') as f:
502            for line in f:
503                if line.startswith(look_for):
504                    return int(line.split()[1]) * 1024
505        raise ValueError("can't find %s" % look_for)
506
507    def test_vmem_total(self):
508        self.assertEqual(
509            psutil.virtual_memory().total, self.parse_meminfo("MemTotal:"))
510
511    def test_vmem_free(self):
512        self.assertAlmostEqual(
513            psutil.virtual_memory().free, self.parse_meminfo("MemFree:"),
514            delta=MEMORY_TOLERANCE)
515
516    def test_vmem_buffers(self):
517        self.assertAlmostEqual(
518            psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"),
519            delta=MEMORY_TOLERANCE)
520
521    def test_vmem_shared(self):
522        self.assertAlmostEqual(
523            psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"),
524            delta=MEMORY_TOLERANCE)
525
526    def test_swapmem_total(self):
527        self.assertAlmostEqual(
528            psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"),
529            delta=MEMORY_TOLERANCE)
530
531    def test_swapmem_free(self):
532        self.assertAlmostEqual(
533            psutil.swap_memory().free, self.parse_meminfo("SwapFree:"),
534            delta=MEMORY_TOLERANCE)
535
536    def test_swapmem_used(self):
537        smem = psutil.swap_memory()
538        self.assertEqual(smem.used, smem.total - smem.free)
539
540    def test_cpu_stats_interrupts(self):
541        with open('/proc/stat', 'rb') as f:
542            for line in f:
543                if line.startswith(b'intr'):
544                    interrupts = int(line.split()[1])
545                    break
546            else:
547                raise ValueError("couldn't find line")
548        self.assertAlmostEqual(
549            psutil.cpu_stats().interrupts, interrupts, delta=1000)
550
551    def test_cpu_stats_ctx_switches(self):
552        with open('/proc/stat', 'rb') as f:
553            for line in f:
554                if line.startswith(b'ctxt'):
555                    ctx_switches = int(line.split()[1])
556                    break
557            else:
558                raise ValueError("couldn't find line")
559        self.assertAlmostEqual(
560            psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000)
561
562
563if __name__ == '__main__':
564    from psutil.tests.runner import run
565    run(__file__)
566