#!/usr/bin/env python # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. # TODO: add test for comparing connections with 'sockstat' cmd """BSD specific tests. These are implicitly run by test_psutil.py.""" import os import subprocess import sys import time import psutil from psutil._compat import PY3 from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which, retry_before_failing, reap_children, unittest) PAGESIZE = os.sysconf("SC_PAGE_SIZE") if os.getuid() == 0: # muse requires root privileges MUSE_AVAILABLE = which('muse') else: MUSE_AVAILABLE = False def sysctl(cmdline): """Expects a sysctl command with an argument and parse the result returning only the value of interest. """ result = sh("sysctl " + cmdline) result = result[result.find(": ") + 2:] try: return int(result) except ValueError: return result def muse(field): """Thin wrapper around 'muse' cmdline utility.""" out = sh('muse') for line in out.split('\n'): if line.startswith(field): break else: raise ValueError("line not found") return int(line.split()[1]) @unittest.skipUnless(BSD, "not a BSD system") class BSDSpecificTestCase(unittest.TestCase): @classmethod def setUpClass(cls): cls.pid = get_test_subprocess().pid @classmethod def tearDownClass(cls): reap_children() def test_boot_time(self): s = sysctl('sysctl kern.boottime') s = s[s.find(" sec = ") + 7:] s = s[:s.find(',')] btime = int(s) self.assertEqual(btime, psutil.boot_time()) def test_process_create_time(self): cmdline = "ps -o lstart -p %s" % self.pid p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) output = p.communicate()[0] if PY3: output = str(output, sys.stdout.encoding) start_ps = output.replace('STARTED', '').strip() start_psutil = psutil.Process(self.pid).create_time() start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", time.localtime(start_psutil)) self.assertEqual(start_ps, start_psutil) def test_disks(self): # test psutil.disk_usage() and psutil.disk_partitions() # against "df -a" def df(path): out = sh('df -k "%s"' % path).strip() lines = out.split('\n') lines.pop(0) line = lines.pop(0) dev, total, used, free = line.split()[:4] if dev == 'none': dev = '' total = int(total) * 1024 used = int(used) * 1024 free = int(free) * 1024 return dev, total, used, free for part in psutil.disk_partitions(all=False): usage = psutil.disk_usage(part.mountpoint) dev, total, used, free = df(part.mountpoint) self.assertEqual(part.device, dev) self.assertEqual(usage.total, total) # 10 MB tollerance if abs(usage.free - free) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.free, free)) if abs(usage.used - used) > 10 * 1024 * 1024: self.fail("psutil=%s, df=%s" % (usage.used, used)) @retry_before_failing() def test_memory_maps(self): out = sh('procstat -v %s' % self.pid) maps = psutil.Process(self.pid).memory_maps(grouped=False) lines = out.split('\n')[1:] while lines: line = lines.pop() fields = line.split() _, start, stop, perms, res = fields[:5] map = maps.pop() self.assertEqual("%s-%s" % (start, stop), map.addr) self.assertEqual(int(res), map.rss) if not map.path.startswith('['): self.assertEqual(fields[10], map.path) def test_exe(self): out = sh('procstat -b %s' % self.pid) self.assertEqual(psutil.Process(self.pid).exe(), out.split('\n')[1].split()[-1]) def test_cmdline(self): out = sh('procstat -c %s' % self.pid) self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), ' '.join(out.split('\n')[1].split()[2:])) def test_uids_gids(self): out = sh('procstat -s %s' % self.pid) euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] p = psutil.Process(self.pid) uids = p.uids() gids = p.gids() self.assertEqual(uids.real, int(ruid)) self.assertEqual(uids.effective, int(euid)) self.assertEqual(uids.saved, int(suid)) self.assertEqual(gids.real, int(rgid)) self.assertEqual(gids.effective, int(egid)) self.assertEqual(gids.saved, int(sgid)) # --- virtual_memory(); tests against sysctl def test_vmem_total(self): syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE self.assertEqual(psutil.virtual_memory().total, syst) @retry_before_failing() def test_vmem_active(self): syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE self.assertAlmostEqual(psutil.virtual_memory().active, syst, delta=TOLERANCE) @retry_before_failing() def test_vmem_inactive(self): syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, delta=TOLERANCE) @retry_before_failing() def test_vmem_wired(self): syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE self.assertAlmostEqual(psutil.virtual_memory().wired, syst, delta=TOLERANCE) @retry_before_failing() def test_vmem_cached(self): syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE self.assertAlmostEqual(psutil.virtual_memory().cached, syst, delta=TOLERANCE) @retry_before_failing() def test_vmem_free(self): syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE self.assertAlmostEqual(psutil.virtual_memory().free, syst, delta=TOLERANCE) @retry_before_failing() def test_vmem_buffers(self): syst = sysctl("vfs.bufspace") self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, delta=TOLERANCE) def test_cpu_count_logical(self): syst = sysctl("hw.ncpu") self.assertEqual(psutil.cpu_count(logical=True), syst) # --- virtual_memory(); tests against muse @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") def test_total(self): num = muse('Total') self.assertEqual(psutil.virtual_memory().total, num) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_active(self): num = muse('Active') self.assertAlmostEqual(psutil.virtual_memory().active, num, delta=TOLERANCE) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_inactive(self): num = muse('Inactive') self.assertAlmostEqual(psutil.virtual_memory().inactive, num, delta=TOLERANCE) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_wired(self): num = muse('Wired') self.assertAlmostEqual(psutil.virtual_memory().wired, num, delta=TOLERANCE) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_cached(self): num = muse('Cache') self.assertAlmostEqual(psutil.virtual_memory().cached, num, delta=TOLERANCE) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_free(self): num = muse('Free') self.assertAlmostEqual(psutil.virtual_memory().free, num, delta=TOLERANCE) @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") @retry_before_failing() def test_buffers(self): num = muse('Buffer') self.assertAlmostEqual(psutil.virtual_memory().buffers, num, delta=TOLERANCE) def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': if not main(): sys.exit(1)