1#!/usr/bin/env python3
2
3# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
4# Use of this source code is governed by a BSD-style license that can be
5# found in the LICENSE file.
6
7# TODO: (FreeBSD) add test for comparing connections with 'sockstat' cmd.
8
9
10"""Tests specific to all BSD platforms."""
11
12
13import datetime
14import os
15import re
16import time
17
18import psutil
19from psutil import BSD
20from psutil import FREEBSD
21from psutil import NETBSD
22from psutil import OPENBSD
23from psutil.tests import spawn_testproc
24from psutil.tests import HAS_BATTERY
25from psutil.tests import PsutilTestCase
26from psutil.tests import retry_on_failure
27from psutil.tests import sh
28from psutil.tests import TOLERANCE_SYS_MEM
29from psutil.tests import terminate
30from psutil.tests import unittest
31from psutil.tests import which
32
33
34if BSD:
35    from psutil._psutil_posix import getpagesize
36
37    PAGESIZE = getpagesize()
38    # muse requires root privileges
39    MUSE_AVAILABLE = True if os.getuid() == 0 and which('muse') else False
40else:
41    PAGESIZE = None
42    MUSE_AVAILABLE = False
43
44
45def sysctl(cmdline):
46    """Expects a sysctl command with an argument and parse the result
47    returning only the value of interest.
48    """
49    result = sh("sysctl " + cmdline)
50    if FREEBSD:
51        result = result[result.find(": ") + 2:]
52    elif OPENBSD or NETBSD:
53        result = result[result.find("=") + 1:]
54    try:
55        return int(result)
56    except ValueError:
57        return result
58
59
60def muse(field):
61    """Thin wrapper around 'muse' cmdline utility."""
62    out = sh('muse')
63    for line in out.split('\n'):
64        if line.startswith(field):
65            break
66    else:
67        raise ValueError("line not found")
68    return int(line.split()[1])
69
70
71# =====================================================================
72# --- All BSD*
73# =====================================================================
74
75
76@unittest.skipIf(not BSD, "BSD only")
77class BSDTestCase(PsutilTestCase):
78    """Generic tests common to all BSD variants."""
79
80    @classmethod
81    def setUpClass(cls):
82        cls.pid = spawn_testproc().pid
83
84    @classmethod
85    def tearDownClass(cls):
86        terminate(cls.pid)
87
88    @unittest.skipIf(NETBSD, "-o lstart doesn't work on NETBSD")
89    def test_process_create_time(self):
90        output = sh("ps -o lstart -p %s" % self.pid)
91        start_ps = output.replace('STARTED', '').strip()
92        start_psutil = psutil.Process(self.pid).create_time()
93        start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
94                                     time.localtime(start_psutil))
95        self.assertEqual(start_ps, start_psutil)
96
97    def test_disks(self):
98        # test psutil.disk_usage() and psutil.disk_partitions()
99        # against "df -a"
100        def df(path):
101            out = sh('df -k "%s"' % path).strip()
102            lines = out.split('\n')
103            lines.pop(0)
104            line = lines.pop(0)
105            dev, total, used, free = line.split()[:4]
106            if dev == 'none':
107                dev = ''
108            total = int(total) * 1024
109            used = int(used) * 1024
110            free = int(free) * 1024
111            return dev, total, used, free
112
113        for part in psutil.disk_partitions(all=False):
114            usage = psutil.disk_usage(part.mountpoint)
115            dev, total, used, free = df(part.mountpoint)
116            self.assertEqual(part.device, dev)
117            self.assertEqual(usage.total, total)
118            # 10 MB tollerance
119            if abs(usage.free - free) > 10 * 1024 * 1024:
120                self.fail("psutil=%s, df=%s" % (usage.free, free))
121            if abs(usage.used - used) > 10 * 1024 * 1024:
122                self.fail("psutil=%s, df=%s" % (usage.used, used))
123
124    @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
125    def test_cpu_count_logical(self):
126        syst = sysctl("hw.ncpu")
127        self.assertEqual(psutil.cpu_count(logical=True), syst)
128
129    @unittest.skipIf(not which('sysctl'), "sysctl cmd not available")
130    def test_virtual_memory_total(self):
131        num = sysctl('hw.physmem')
132        self.assertEqual(num, psutil.virtual_memory().total)
133
134    def test_net_if_stats(self):
135        for name, stats in psutil.net_if_stats().items():
136            try:
137                out = sh("ifconfig %s" % name)
138            except RuntimeError:
139                pass
140            else:
141                self.assertEqual(stats.isup, 'RUNNING' in out, msg=out)
142                if "mtu" in out:
143                    self.assertEqual(stats.mtu,
144                                     int(re.findall(r'mtu (\d+)', out)[0]))
145
146
147# =====================================================================
148# --- FreeBSD
149# =====================================================================
150
151
152@unittest.skipIf(not FREEBSD, "FREEBSD only")
153class FreeBSDPsutilTestCase(PsutilTestCase):
154
155    @classmethod
156    def setUpClass(cls):
157        cls.pid = spawn_testproc().pid
158
159    @classmethod
160    def tearDownClass(cls):
161        terminate(cls.pid)
162
163    @retry_on_failure()
164    def test_memory_maps(self):
165        out = sh('procstat -v %s' % self.pid)
166        maps = psutil.Process(self.pid).memory_maps(grouped=False)
167        lines = out.split('\n')[1:]
168        while lines:
169            line = lines.pop()
170            fields = line.split()
171            _, start, stop, perms, res = fields[:5]
172            map = maps.pop()
173            self.assertEqual("%s-%s" % (start, stop), map.addr)
174            self.assertEqual(int(res), map.rss)
175            if not map.path.startswith('['):
176                self.assertEqual(fields[10], map.path)
177
178    def test_exe(self):
179        out = sh('procstat -b %s' % self.pid)
180        self.assertEqual(psutil.Process(self.pid).exe(),
181                         out.split('\n')[1].split()[-1])
182
183    def test_cmdline(self):
184        out = sh('procstat -c %s' % self.pid)
185        self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
186                         ' '.join(out.split('\n')[1].split()[2:]))
187
188    def test_uids_gids(self):
189        out = sh('procstat -s %s' % self.pid)
190        euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
191        p = psutil.Process(self.pid)
192        uids = p.uids()
193        gids = p.gids()
194        self.assertEqual(uids.real, int(ruid))
195        self.assertEqual(uids.effective, int(euid))
196        self.assertEqual(uids.saved, int(suid))
197        self.assertEqual(gids.real, int(rgid))
198        self.assertEqual(gids.effective, int(egid))
199        self.assertEqual(gids.saved, int(sgid))
200
201    @retry_on_failure()
202    def test_ctx_switches(self):
203        tested = []
204        out = sh('procstat -r %s' % self.pid)
205        p = psutil.Process(self.pid)
206        for line in out.split('\n'):
207            line = line.lower().strip()
208            if ' voluntary context' in line:
209                pstat_value = int(line.split()[-1])
210                psutil_value = p.num_ctx_switches().voluntary
211                self.assertEqual(pstat_value, psutil_value)
212                tested.append(None)
213            elif ' involuntary context' in line:
214                pstat_value = int(line.split()[-1])
215                psutil_value = p.num_ctx_switches().involuntary
216                self.assertEqual(pstat_value, psutil_value)
217                tested.append(None)
218        if len(tested) != 2:
219            raise RuntimeError("couldn't find lines match in procstat out")
220
221    @retry_on_failure()
222    def test_cpu_times(self):
223        tested = []
224        out = sh('procstat -r %s' % self.pid)
225        p = psutil.Process(self.pid)
226        for line in out.split('\n'):
227            line = line.lower().strip()
228            if 'user time' in line:
229                pstat_value = float('0.' + line.split()[-1].split('.')[-1])
230                psutil_value = p.cpu_times().user
231                self.assertEqual(pstat_value, psutil_value)
232                tested.append(None)
233            elif 'system time' in line:
234                pstat_value = float('0.' + line.split()[-1].split('.')[-1])
235                psutil_value = p.cpu_times().system
236                self.assertEqual(pstat_value, psutil_value)
237                tested.append(None)
238        if len(tested) != 2:
239            raise RuntimeError("couldn't find lines match in procstat out")
240
241
242@unittest.skipIf(not FREEBSD, "FREEBSD only")
243class FreeBSDSystemTestCase(PsutilTestCase):
244
245    @staticmethod
246    def parse_swapinfo():
247        # the last line is always the total
248        output = sh("swapinfo -k").splitlines()[-1]
249        parts = re.split(r'\s+', output)
250
251        if not parts:
252            raise ValueError("Can't parse swapinfo: %s" % output)
253
254        # the size is in 1k units, so multiply by 1024
255        total, used, free = (int(p) * 1024 for p in parts[1:4])
256        return total, used, free
257
258    def test_cpu_frequency_against_sysctl(self):
259        # Currently only cpu 0 is frequency is supported in FreeBSD
260        # All other cores use the same frequency.
261        sensor = "dev.cpu.0.freq"
262        try:
263            sysctl_result = int(sysctl(sensor))
264        except RuntimeError:
265            self.skipTest("frequencies not supported by kernel")
266        self.assertEqual(psutil.cpu_freq().current, sysctl_result)
267
268        sensor = "dev.cpu.0.freq_levels"
269        sysctl_result = sysctl(sensor)
270        # sysctl returns a string of the format:
271        # <freq_level_1>/<voltage_level_1> <freq_level_2>/<voltage_level_2>...
272        # Ordered highest available to lowest available.
273        max_freq = int(sysctl_result.split()[0].split("/")[0])
274        min_freq = int(sysctl_result.split()[-1].split("/")[0])
275        self.assertEqual(psutil.cpu_freq().max, max_freq)
276        self.assertEqual(psutil.cpu_freq().min, min_freq)
277
278    # --- virtual_memory(); tests against sysctl
279
280    @retry_on_failure()
281    def test_vmem_active(self):
282        syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
283        self.assertAlmostEqual(psutil.virtual_memory().active, syst,
284                               delta=TOLERANCE_SYS_MEM)
285
286    @retry_on_failure()
287    def test_vmem_inactive(self):
288        syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
289        self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
290                               delta=TOLERANCE_SYS_MEM)
291
292    @retry_on_failure()
293    def test_vmem_wired(self):
294        syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
295        self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
296                               delta=TOLERANCE_SYS_MEM)
297
298    @retry_on_failure()
299    def test_vmem_cached(self):
300        syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
301        self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
302                               delta=TOLERANCE_SYS_MEM)
303
304    @retry_on_failure()
305    def test_vmem_free(self):
306        syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
307        self.assertAlmostEqual(psutil.virtual_memory().free, syst,
308                               delta=TOLERANCE_SYS_MEM)
309
310    @retry_on_failure()
311    def test_vmem_buffers(self):
312        syst = sysctl("vfs.bufspace")
313        self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
314                               delta=TOLERANCE_SYS_MEM)
315
316    # --- virtual_memory(); tests against muse
317
318    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
319    def test_muse_vmem_total(self):
320        num = muse('Total')
321        self.assertEqual(psutil.virtual_memory().total, num)
322
323    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
324    @retry_on_failure()
325    def test_muse_vmem_active(self):
326        num = muse('Active')
327        self.assertAlmostEqual(psutil.virtual_memory().active, num,
328                               delta=TOLERANCE_SYS_MEM)
329
330    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
331    @retry_on_failure()
332    def test_muse_vmem_inactive(self):
333        num = muse('Inactive')
334        self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
335                               delta=TOLERANCE_SYS_MEM)
336
337    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
338    @retry_on_failure()
339    def test_muse_vmem_wired(self):
340        num = muse('Wired')
341        self.assertAlmostEqual(psutil.virtual_memory().wired, num,
342                               delta=TOLERANCE_SYS_MEM)
343
344    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
345    @retry_on_failure()
346    def test_muse_vmem_cached(self):
347        num = muse('Cache')
348        self.assertAlmostEqual(psutil.virtual_memory().cached, num,
349                               delta=TOLERANCE_SYS_MEM)
350
351    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
352    @retry_on_failure()
353    def test_muse_vmem_free(self):
354        num = muse('Free')
355        self.assertAlmostEqual(psutil.virtual_memory().free, num,
356                               delta=TOLERANCE_SYS_MEM)
357
358    @unittest.skipIf(not MUSE_AVAILABLE, "muse not installed")
359    @retry_on_failure()
360    def test_muse_vmem_buffers(self):
361        num = muse('Buffer')
362        self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
363                               delta=TOLERANCE_SYS_MEM)
364
365    def test_cpu_stats_ctx_switches(self):
366        self.assertAlmostEqual(psutil.cpu_stats().ctx_switches,
367                               sysctl('vm.stats.sys.v_swtch'), delta=1000)
368
369    def test_cpu_stats_interrupts(self):
370        self.assertAlmostEqual(psutil.cpu_stats().interrupts,
371                               sysctl('vm.stats.sys.v_intr'), delta=1000)
372
373    def test_cpu_stats_soft_interrupts(self):
374        self.assertAlmostEqual(psutil.cpu_stats().soft_interrupts,
375                               sysctl('vm.stats.sys.v_soft'), delta=1000)
376
377    @retry_on_failure()
378    def test_cpu_stats_syscalls(self):
379        # pretty high tolerance but it looks like it's OK.
380        self.assertAlmostEqual(psutil.cpu_stats().syscalls,
381                               sysctl('vm.stats.sys.v_syscall'), delta=200000)
382
383    # def test_cpu_stats_traps(self):
384    #    self.assertAlmostEqual(psutil.cpu_stats().traps,
385    #                           sysctl('vm.stats.sys.v_trap'), delta=1000)
386
387    # --- swap memory
388
389    def test_swapmem_free(self):
390        total, used, free = self.parse_swapinfo()
391        self.assertAlmostEqual(
392            psutil.swap_memory().free, free, delta=TOLERANCE_SYS_MEM)
393
394    def test_swapmem_used(self):
395        total, used, free = self.parse_swapinfo()
396        self.assertAlmostEqual(
397            psutil.swap_memory().used, used, delta=TOLERANCE_SYS_MEM)
398
399    def test_swapmem_total(self):
400        total, used, free = self.parse_swapinfo()
401        self.assertAlmostEqual(
402            psutil.swap_memory().total, total, delta=TOLERANCE_SYS_MEM)
403
404    # --- others
405
406    def test_boot_time(self):
407        s = sysctl('sysctl kern.boottime')
408        s = s[s.find(" sec = ") + 7:]
409        s = s[:s.find(',')]
410        btime = int(s)
411        self.assertEqual(btime, psutil.boot_time())
412
413    # --- sensors_battery
414
415    @unittest.skipIf(not HAS_BATTERY, "no battery")
416    def test_sensors_battery(self):
417        def secs2hours(secs):
418            m, s = divmod(secs, 60)
419            h, m = divmod(m, 60)
420            return "%d:%02d" % (h, m)
421
422        out = sh("acpiconf -i 0")
423        fields = dict([(x.split('\t')[0], x.split('\t')[-1])
424                       for x in out.split("\n")])
425        metrics = psutil.sensors_battery()
426        percent = int(fields['Remaining capacity:'].replace('%', ''))
427        remaining_time = fields['Remaining time:']
428        self.assertEqual(metrics.percent, percent)
429        if remaining_time == 'unknown':
430            self.assertEqual(metrics.secsleft, psutil.POWER_TIME_UNLIMITED)
431        else:
432            self.assertEqual(secs2hours(metrics.secsleft), remaining_time)
433
434    @unittest.skipIf(not HAS_BATTERY, "no battery")
435    def test_sensors_battery_against_sysctl(self):
436        self.assertEqual(psutil.sensors_battery().percent,
437                         sysctl("hw.acpi.battery.life"))
438        self.assertEqual(psutil.sensors_battery().power_plugged,
439                         sysctl("hw.acpi.acline") == 1)
440        secsleft = psutil.sensors_battery().secsleft
441        if secsleft < 0:
442            self.assertEqual(sysctl("hw.acpi.battery.time"), -1)
443        else:
444            self.assertEqual(secsleft, sysctl("hw.acpi.battery.time") * 60)
445
446    @unittest.skipIf(HAS_BATTERY, "has battery")
447    def test_sensors_battery_no_battery(self):
448        # If no battery is present one of these calls is supposed
449        # to fail, see:
450        # https://github.com/giampaolo/psutil/issues/1074
451        with self.assertRaises(RuntimeError):
452            sysctl("hw.acpi.battery.life")
453            sysctl("hw.acpi.battery.time")
454            sysctl("hw.acpi.acline")
455        self.assertIsNone(psutil.sensors_battery())
456
457    # --- sensors_temperatures
458
459    def test_sensors_temperatures_against_sysctl(self):
460        num_cpus = psutil.cpu_count(True)
461        for cpu in range(num_cpus):
462            sensor = "dev.cpu.%s.temperature" % cpu
463            # sysctl returns a string in the format 46.0C
464            try:
465                sysctl_result = int(float(sysctl(sensor)[:-1]))
466            except RuntimeError:
467                self.skipTest("temperatures not supported by kernel")
468            self.assertAlmostEqual(
469                psutil.sensors_temperatures()["coretemp"][cpu].current,
470                sysctl_result, delta=10)
471
472            sensor = "dev.cpu.%s.coretemp.tjmax" % cpu
473            sysctl_result = int(float(sysctl(sensor)[:-1]))
474            self.assertEqual(
475                psutil.sensors_temperatures()["coretemp"][cpu].high,
476                sysctl_result)
477
478
479# =====================================================================
480# --- OpenBSD
481# =====================================================================
482
483
484@unittest.skipIf(not OPENBSD, "OPENBSD only")
485class OpenBSDTestCase(PsutilTestCase):
486
487    def test_boot_time(self):
488        s = sysctl('kern.boottime')
489        sys_bt = datetime.datetime.strptime(s, "%a %b %d %H:%M:%S %Y")
490        psutil_bt = datetime.datetime.fromtimestamp(psutil.boot_time())
491        self.assertEqual(sys_bt, psutil_bt)
492
493
494# =====================================================================
495# --- NetBSD
496# =====================================================================
497
498
499@unittest.skipIf(not NETBSD, "NETBSD only")
500class NetBSDTestCase(PsutilTestCase):
501
502    @staticmethod
503    def parse_meminfo(look_for):
504        with open('/proc/meminfo', 'rt') as f:
505            for line in f:
506                if line.startswith(look_for):
507                    return int(line.split()[1]) * 1024
508        raise ValueError("can't find %s" % look_for)
509
510    def test_vmem_total(self):
511        self.assertEqual(
512            psutil.virtual_memory().total, self.parse_meminfo("MemTotal:"))
513
514    def test_vmem_free(self):
515        self.assertAlmostEqual(
516            psutil.virtual_memory().free, self.parse_meminfo("MemFree:"),
517            delta=TOLERANCE_SYS_MEM)
518
519    def test_vmem_buffers(self):
520        self.assertAlmostEqual(
521            psutil.virtual_memory().buffers, self.parse_meminfo("Buffers:"),
522            delta=TOLERANCE_SYS_MEM)
523
524    def test_vmem_shared(self):
525        self.assertAlmostEqual(
526            psutil.virtual_memory().shared, self.parse_meminfo("MemShared:"),
527            delta=TOLERANCE_SYS_MEM)
528
529    def test_swapmem_total(self):
530        self.assertAlmostEqual(
531            psutil.swap_memory().total, self.parse_meminfo("SwapTotal:"),
532            delta=TOLERANCE_SYS_MEM)
533
534    def test_swapmem_free(self):
535        self.assertAlmostEqual(
536            psutil.swap_memory().free, self.parse_meminfo("SwapFree:"),
537            delta=TOLERANCE_SYS_MEM)
538
539    def test_swapmem_used(self):
540        smem = psutil.swap_memory()
541        self.assertEqual(smem.used, smem.total - smem.free)
542
543    def test_cpu_stats_interrupts(self):
544        with open('/proc/stat', 'rb') as f:
545            for line in f:
546                if line.startswith(b'intr'):
547                    interrupts = int(line.split()[1])
548                    break
549            else:
550                raise ValueError("couldn't find line")
551        self.assertAlmostEqual(
552            psutil.cpu_stats().interrupts, interrupts, delta=1000)
553
554    def test_cpu_stats_ctx_switches(self):
555        with open('/proc/stat', 'rb') as f:
556            for line in f:
557                if line.startswith(b'ctxt'):
558                    ctx_switches = int(line.split()[1])
559                    break
560            else:
561                raise ValueError("couldn't find line")
562        self.assertAlmostEqual(
563            psutil.cpu_stats().ctx_switches, ctx_switches, delta=1000)
564
565
566if __name__ == '__main__':
567    from psutil.tests.runner import run_from_name
568    run_from_name(__file__)
569