diff options
Diffstat (limited to 'python/psutil/test/_bsd.py')
-rw-r--r-- | python/psutil/test/_bsd.py | 252 |
1 files changed, 252 insertions, 0 deletions
diff --git a/python/psutil/test/_bsd.py b/python/psutil/test/_bsd.py new file mode 100644 index 000000000..e4a3225d2 --- /dev/null +++ b/python/psutil/test/_bsd.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +# TODO: add test for comparing connections with 'sockstat' cmd + +"""BSD specific tests. These are implicitly run by test_psutil.py.""" + +import os +import subprocess +import sys +import time + +import psutil + +from psutil._compat import PY3 +from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which, + retry_before_failing, reap_children, unittest) + + +PAGESIZE = os.sysconf("SC_PAGE_SIZE") +if os.getuid() == 0: # muse requires root privileges + MUSE_AVAILABLE = which('muse') +else: + MUSE_AVAILABLE = False + + +def sysctl(cmdline): + """Expects a sysctl command with an argument and parse the result + returning only the value of interest. + """ + result = sh("sysctl " + cmdline) + result = result[result.find(": ") + 2:] + try: + return int(result) + except ValueError: + return result + + +def muse(field): + """Thin wrapper around 'muse' cmdline utility.""" + out = sh('muse') + for line in out.split('\n'): + if line.startswith(field): + break + else: + raise ValueError("line not found") + return int(line.split()[1]) + + +@unittest.skipUnless(BSD, "not a BSD system") +class BSDSpecificTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_boot_time(self): + s = sysctl('sysctl kern.boottime') + s = s[s.find(" sec = ") + 7:] + s = s[:s.find(',')] + btime = int(s) + self.assertEqual(btime, psutil.boot_time()) + + def test_process_create_time(self): + cmdline = "ps -o lstart -p %s" % self.pid + p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0] + if PY3: + output = str(output, sys.stdout.encoding) + start_ps = output.replace('STARTED', '').strip() + start_psutil = psutil.Process(self.pid).create_time() + start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", + time.localtime(start_psutil)) + self.assertEqual(start_ps, start_psutil) + + def test_disks(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -k "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total = int(total) * 1024 + used = int(used) * 1024 + free = int(free) * 1024 + return dev, total, used, free + + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(part.device, dev) + self.assertEqual(usage.total, total) + # 10 MB tollerance + if abs(usage.free - free) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.free, free)) + if abs(usage.used - used) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.used, used)) + + @retry_before_failing() + def test_memory_maps(self): + out = sh('procstat -v %s' % self.pid) + maps = psutil.Process(self.pid).memory_maps(grouped=False) + lines = out.split('\n')[1:] + while lines: + line = lines.pop() + fields = line.split() + _, start, stop, perms, res = fields[:5] + map = maps.pop() + self.assertEqual("%s-%s" % (start, stop), map.addr) + self.assertEqual(int(res), map.rss) + if not map.path.startswith('['): + self.assertEqual(fields[10], map.path) + + def test_exe(self): + out = sh('procstat -b %s' % self.pid) + self.assertEqual(psutil.Process(self.pid).exe(), + out.split('\n')[1].split()[-1]) + + def test_cmdline(self): + out = sh('procstat -c %s' % self.pid) + self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), + ' '.join(out.split('\n')[1].split()[2:])) + + def test_uids_gids(self): + out = sh('procstat -s %s' % self.pid) + euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] + p = psutil.Process(self.pid) + uids = p.uids() + gids = p.gids() + self.assertEqual(uids.real, int(ruid)) + self.assertEqual(uids.effective, int(euid)) + self.assertEqual(uids.saved, int(suid)) + self.assertEqual(gids.real, int(rgid)) + self.assertEqual(gids.effective, int(egid)) + self.assertEqual(gids.saved, int(sgid)) + + # --- virtual_memory(); tests against sysctl + + def test_vmem_total(self): + syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE + self.assertEqual(psutil.virtual_memory().total, syst) + + @retry_before_failing() + def test_vmem_active(self): + syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().active, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_inactive(self): + syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_wired(self): + syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().wired, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_cached(self): + syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().cached, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_free(self): + syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().free, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_buffers(self): + syst = sysctl("vfs.bufspace") + self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, + delta=TOLERANCE) + + def test_cpu_count_logical(self): + syst = sysctl("hw.ncpu") + self.assertEqual(psutil.cpu_count(logical=True), syst) + + # --- virtual_memory(); tests against muse + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + def test_total(self): + num = muse('Total') + self.assertEqual(psutil.virtual_memory().total, num) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_active(self): + num = muse('Active') + self.assertAlmostEqual(psutil.virtual_memory().active, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_inactive(self): + num = muse('Inactive') + self.assertAlmostEqual(psutil.virtual_memory().inactive, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_wired(self): + num = muse('Wired') + self.assertAlmostEqual(psutil.virtual_memory().wired, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_cached(self): + num = muse('Cache') + self.assertAlmostEqual(psutil.virtual_memory().cached, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_free(self): + num = muse('Free') + self.assertAlmostEqual(psutil.virtual_memory().free, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_buffers(self): + num = muse('Buffer') + self.assertAlmostEqual(psutil.virtual_memory().buffers, num, + delta=TOLERANCE) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) |