summaryrefslogtreecommitdiffstats
path: root/python/psutil/test/_osx.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/psutil/test/_osx.py')
-rw-r--r--python/psutil/test/_osx.py160
1 files changed, 160 insertions, 0 deletions
diff --git a/python/psutil/test/_osx.py b/python/psutil/test/_osx.py
new file mode 100644
index 000000000..6e6e4380e
--- /dev/null
+++ b/python/psutil/test/_osx.py
@@ -0,0 +1,160 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""OSX specific tests. These are implicitly run by test_psutil.py."""
+
+import os
+import re
+import subprocess
+import sys
+import time
+
+import psutil
+
+from psutil._compat import PY3
+from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess,
+ reap_children, retry_before_failing, unittest)
+
+
+PAGESIZE = os.sysconf("SC_PAGE_SIZE")
+
+
+def sysctl(cmdline):
+ """Expects a sysctl command with an argument and parse the result
+ returning only the value of interest.
+ """
+ p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
+ result = p.communicate()[0].strip().split()[1]
+ if PY3:
+ result = str(result, sys.stdout.encoding)
+ try:
+ return int(result)
+ except ValueError:
+ return result
+
+
+def vm_stat(field):
+ """Wrapper around 'vm_stat' cmdline utility."""
+ out = sh('vm_stat')
+ for line in out.split('\n'):
+ if field in line:
+ break
+ else:
+ raise ValueError("line not found")
+ return int(re.search('\d+', line).group(0)) * PAGESIZE
+
+
+@unittest.skipUnless(OSX, "not an OSX system")
+class OSXSpecificTestCase(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls):
+ cls.pid = get_test_subprocess().pid
+
+ @classmethod
+ def tearDownClass(cls):
+ reap_children()
+
+ def test_process_create_time(self):
+ cmdline = "ps -o lstart -p %s" % self.pid
+ p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
+ output = p.communicate()[0]
+ if PY3:
+ output = str(output, sys.stdout.encoding)
+ start_ps = output.replace('STARTED', '').strip()
+ start_psutil = psutil.Process(self.pid).create_time()
+ start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
+ time.localtime(start_psutil))
+ self.assertEqual(start_ps, start_psutil)
+
+ def test_disks(self):
+ # test psutil.disk_usage() and psutil.disk_partitions()
+ # against "df -a"
+ def df(path):
+ out = sh('df -k "%s"' % path).strip()
+ lines = out.split('\n')
+ lines.pop(0)
+ line = lines.pop(0)
+ dev, total, used, free = line.split()[:4]
+ if dev == 'none':
+ dev = ''
+ total = int(total) * 1024
+ used = int(used) * 1024
+ free = int(free) * 1024
+ return dev, total, used, free
+
+ for part in psutil.disk_partitions(all=False):
+ usage = psutil.disk_usage(part.mountpoint)
+ dev, total, used, free = df(part.mountpoint)
+ self.assertEqual(part.device, dev)
+ self.assertEqual(usage.total, total)
+ # 10 MB tollerance
+ if abs(usage.free - free) > 10 * 1024 * 1024:
+ self.fail("psutil=%s, df=%s" % usage.free, free)
+ if abs(usage.used - used) > 10 * 1024 * 1024:
+ self.fail("psutil=%s, df=%s" % usage.used, used)
+
+ # --- virtual mem
+
+ def test_vmem_total(self):
+ sysctl_hwphymem = sysctl('sysctl hw.memsize')
+ self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
+
+ @retry_before_failing()
+ def test_vmem_free(self):
+ num = vm_stat("free")
+ self.assertAlmostEqual(psutil.virtual_memory().free, num,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_active(self):
+ num = vm_stat("active")
+ self.assertAlmostEqual(psutil.virtual_memory().active, num,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_inactive(self):
+ num = vm_stat("inactive")
+ self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_wired(self):
+ num = vm_stat("wired")
+ self.assertAlmostEqual(psutil.virtual_memory().wired, num,
+ delta=TOLERANCE)
+
+ # --- swap mem
+
+ def test_swapmem_sin(self):
+ num = vm_stat("Pageins")
+ self.assertEqual(psutil.swap_memory().sin, num)
+
+ def test_swapmem_sout(self):
+ num = vm_stat("Pageouts")
+ self.assertEqual(psutil.swap_memory().sout, num)
+
+ def test_swapmem_total(self):
+ tot1 = psutil.swap_memory().total
+ tot2 = 0
+ # OSX uses multiple cache files:
+ # http://en.wikipedia.org/wiki/Paging#OS_X
+ for name in os.listdir("/var/vm/"):
+ file = os.path.join("/var/vm", name)
+ if os.path.isfile(file):
+ tot2 += os.path.getsize(file)
+ self.assertEqual(tot1, tot2)
+
+
+def main():
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
+ result = unittest.TextTestRunner(verbosity=2).run(test_suite)
+ return result.wasSuccessful()
+
+if __name__ == '__main__':
+ if not main():
+ sys.exit(1)