diff options
Diffstat (limited to 'python/psutil/test')
-rw-r--r-- | python/psutil/test/README.rst | 21 | ||||
-rw-r--r-- | python/psutil/test/_bsd.py | 252 | ||||
-rw-r--r-- | python/psutil/test/_linux.py | 473 | ||||
-rw-r--r-- | python/psutil/test/_osx.py | 160 | ||||
-rw-r--r-- | python/psutil/test/_posix.py | 258 | ||||
-rw-r--r-- | python/psutil/test/_sunos.py | 48 | ||||
-rw-r--r-- | python/psutil/test/_windows.py | 464 | ||||
-rw-r--r-- | python/psutil/test/test_memory_leaks.py | 445 | ||||
-rw-r--r-- | python/psutil/test/test_psutil.py | 3013 |
9 files changed, 5134 insertions, 0 deletions
diff --git a/python/psutil/test/README.rst b/python/psutil/test/README.rst new file mode 100644 index 000000000..3f2a468ef --- /dev/null +++ b/python/psutil/test/README.rst @@ -0,0 +1,21 @@ +- The recommended way to run tests (also on Windows) is to cd into parent + directory and run ``make test`` + +- Dependencies for running tests: + - python 2.6: ipaddress, mock, unittest2 + - python 2.7: ipaddress, mock + - python 3.2: ipaddress, mock + - python 3.3: ipaddress + - python >= 3.4: no deps required + +- The main test script is ``test_psutil.py``, which also imports platform-specific + ``_*.py`` scripts (which should be ignored). + +- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must + be run separately with ``make test-memleaks``. + +- To run tests on all supported Python version install tox (pip install tox) + then run ``tox``. + +- Every time a commit is pushed tests are automatically run on Travis: + https://travis-ci.org/giampaolo/psutil/ diff --git a/python/psutil/test/_bsd.py b/python/psutil/test/_bsd.py new file mode 100644 index 000000000..e4a3225d2 --- /dev/null +++ b/python/psutil/test/_bsd.py @@ -0,0 +1,252 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +# TODO: add test for comparing connections with 'sockstat' cmd + +"""BSD specific tests. These are implicitly run by test_psutil.py.""" + +import os +import subprocess +import sys +import time + +import psutil + +from psutil._compat import PY3 +from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which, + retry_before_failing, reap_children, unittest) + + +PAGESIZE = os.sysconf("SC_PAGE_SIZE") +if os.getuid() == 0: # muse requires root privileges + MUSE_AVAILABLE = which('muse') +else: + MUSE_AVAILABLE = False + + +def sysctl(cmdline): + """Expects a sysctl command with an argument and parse the result + returning only the value of interest. + """ + result = sh("sysctl " + cmdline) + result = result[result.find(": ") + 2:] + try: + return int(result) + except ValueError: + return result + + +def muse(field): + """Thin wrapper around 'muse' cmdline utility.""" + out = sh('muse') + for line in out.split('\n'): + if line.startswith(field): + break + else: + raise ValueError("line not found") + return int(line.split()[1]) + + +@unittest.skipUnless(BSD, "not a BSD system") +class BSDSpecificTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_boot_time(self): + s = sysctl('sysctl kern.boottime') + s = s[s.find(" sec = ") + 7:] + s = s[:s.find(',')] + btime = int(s) + self.assertEqual(btime, psutil.boot_time()) + + def test_process_create_time(self): + cmdline = "ps -o lstart -p %s" % self.pid + p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0] + if PY3: + output = str(output, sys.stdout.encoding) + start_ps = output.replace('STARTED', '').strip() + start_psutil = psutil.Process(self.pid).create_time() + start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", + time.localtime(start_psutil)) + self.assertEqual(start_ps, start_psutil) + + def test_disks(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -k "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total = int(total) * 1024 + used = int(used) * 1024 + free = int(free) * 1024 + return dev, total, used, free + + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(part.device, dev) + self.assertEqual(usage.total, total) + # 10 MB tollerance + if abs(usage.free - free) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.free, free)) + if abs(usage.used - used) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.used, used)) + + @retry_before_failing() + def test_memory_maps(self): + out = sh('procstat -v %s' % self.pid) + maps = psutil.Process(self.pid).memory_maps(grouped=False) + lines = out.split('\n')[1:] + while lines: + line = lines.pop() + fields = line.split() + _, start, stop, perms, res = fields[:5] + map = maps.pop() + self.assertEqual("%s-%s" % (start, stop), map.addr) + self.assertEqual(int(res), map.rss) + if not map.path.startswith('['): + self.assertEqual(fields[10], map.path) + + def test_exe(self): + out = sh('procstat -b %s' % self.pid) + self.assertEqual(psutil.Process(self.pid).exe(), + out.split('\n')[1].split()[-1]) + + def test_cmdline(self): + out = sh('procstat -c %s' % self.pid) + self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), + ' '.join(out.split('\n')[1].split()[2:])) + + def test_uids_gids(self): + out = sh('procstat -s %s' % self.pid) + euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] + p = psutil.Process(self.pid) + uids = p.uids() + gids = p.gids() + self.assertEqual(uids.real, int(ruid)) + self.assertEqual(uids.effective, int(euid)) + self.assertEqual(uids.saved, int(suid)) + self.assertEqual(gids.real, int(rgid)) + self.assertEqual(gids.effective, int(egid)) + self.assertEqual(gids.saved, int(sgid)) + + # --- virtual_memory(); tests against sysctl + + def test_vmem_total(self): + syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE + self.assertEqual(psutil.virtual_memory().total, syst) + + @retry_before_failing() + def test_vmem_active(self): + syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().active, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_inactive(self): + syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_wired(self): + syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().wired, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_cached(self): + syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().cached, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_free(self): + syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE + self.assertAlmostEqual(psutil.virtual_memory().free, syst, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_buffers(self): + syst = sysctl("vfs.bufspace") + self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, + delta=TOLERANCE) + + def test_cpu_count_logical(self): + syst = sysctl("hw.ncpu") + self.assertEqual(psutil.cpu_count(logical=True), syst) + + # --- virtual_memory(); tests against muse + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + def test_total(self): + num = muse('Total') + self.assertEqual(psutil.virtual_memory().total, num) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_active(self): + num = muse('Active') + self.assertAlmostEqual(psutil.virtual_memory().active, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_inactive(self): + num = muse('Inactive') + self.assertAlmostEqual(psutil.virtual_memory().inactive, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_wired(self): + num = muse('Wired') + self.assertAlmostEqual(psutil.virtual_memory().wired, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_cached(self): + num = muse('Cache') + self.assertAlmostEqual(psutil.virtual_memory().cached, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_free(self): + num = muse('Free') + self.assertAlmostEqual(psutil.virtual_memory().free, num, + delta=TOLERANCE) + + @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") + @retry_before_failing() + def test_buffers(self): + num = muse('Buffer') + self.assertAlmostEqual(psutil.virtual_memory().buffers, num, + delta=TOLERANCE) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/_linux.py b/python/psutil/test/_linux.py new file mode 100644 index 000000000..c1927ea8b --- /dev/null +++ b/python/psutil/test/_linux.py @@ -0,0 +1,473 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Linux specific tests. These are implicitly run by test_psutil.py.""" + +from __future__ import division +import contextlib +import errno +import fcntl +import io +import os +import pprint +import re +import socket +import struct +import sys +import tempfile +import time +import warnings + +try: + from unittest import mock # py3 +except ImportError: + import mock # requires "pip install mock" + +from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX +from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess, + retry_before_failing, get_kernel_version, unittest, + which, call_until) + +import psutil +import psutil._pslinux +from psutil._compat import PY3, u + + +SIOCGIFADDR = 0x8915 +SIOCGIFCONF = 0x8912 +SIOCGIFHWADDR = 0x8927 + + +def get_ipv4_address(ifname): + ifname = ifname[:15] + if PY3: + ifname = bytes(ifname, 'ascii') + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + with contextlib.closing(s): + return socket.inet_ntoa( + fcntl.ioctl(s.fileno(), + SIOCGIFADDR, + struct.pack('256s', ifname))[20:24]) + + +def get_mac_address(ifname): + ifname = ifname[:15] + if PY3: + ifname = bytes(ifname, 'ascii') + s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + with contextlib.closing(s): + info = fcntl.ioctl( + s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) + if PY3: + def ord(x): + return x + else: + import __builtin__ + ord = __builtin__.ord + return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] + + +@unittest.skipUnless(LINUX, "not a Linux system") +class LinuxSpecificTestCase(unittest.TestCase): + + @unittest.skipIf( + POSIX and not hasattr(os, 'statvfs'), + reason="os.statvfs() function not available on this platform") + @skip_on_not_implemented() + def test_disks(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -P -B 1 "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total, used, free = int(total), int(used), int(free) + return dev, total, used, free + + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(part.device, dev) + self.assertEqual(usage.total, total) + # 10 MB tollerance + if abs(usage.free - free) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.free, free)) + if abs(usage.used - used) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % (usage.used, used)) + + def test_memory_maps(self): + sproc = get_test_subprocess() + time.sleep(1) + p = psutil.Process(sproc.pid) + maps = p.memory_maps(grouped=False) + pmap = sh('pmap -x %s' % p.pid).split('\n') + # get rid of header + del pmap[0] + del pmap[0] + while maps and pmap: + this = maps.pop(0) + other = pmap.pop(0) + addr, _, rss, dirty, mode, path = other.split(None, 5) + if not path.startswith('[') and not path.endswith(']'): + self.assertEqual(path, os.path.basename(this.path)) + self.assertEqual(int(rss) * 1024, this.rss) + # test only rwx chars, ignore 's' and 'p' + self.assertEqual(mode[:3], this.perms[:3]) + + def test_vmem_total(self): + lines = sh('free').split('\n')[1:] + total = int(lines[0].split()[1]) * 1024 + self.assertEqual(total, psutil.virtual_memory().total) + + @retry_before_failing() + def test_vmem_used(self): + lines = sh('free').split('\n')[1:] + used = int(lines[0].split()[2]) * 1024 + self.assertAlmostEqual(used, psutil.virtual_memory().used, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_free(self): + lines = sh('free').split('\n')[1:] + free = int(lines[0].split()[3]) * 1024 + self.assertAlmostEqual(free, psutil.virtual_memory().free, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_buffers(self): + lines = sh('free').split('\n')[1:] + buffers = int(lines[0].split()[5]) * 1024 + self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_cached(self): + lines = sh('free').split('\n')[1:] + cached = int(lines[0].split()[6]) * 1024 + self.assertAlmostEqual(cached, psutil.virtual_memory().cached, + delta=TOLERANCE) + + def test_swapmem_total(self): + lines = sh('free').split('\n')[1:] + total = int(lines[2].split()[1]) * 1024 + self.assertEqual(total, psutil.swap_memory().total) + + @retry_before_failing() + def test_swapmem_used(self): + lines = sh('free').split('\n')[1:] + used = int(lines[2].split()[2]) * 1024 + self.assertAlmostEqual(used, psutil.swap_memory().used, + delta=TOLERANCE) + + @retry_before_failing() + def test_swapmem_free(self): + lines = sh('free').split('\n')[1:] + free = int(lines[2].split()[3]) * 1024 + self.assertAlmostEqual(free, psutil.swap_memory().free, + delta=TOLERANCE) + + @unittest.skipIf(TRAVIS, "unknown failure on travis") + def test_cpu_times(self): + fields = psutil.cpu_times()._fields + kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] + kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) + if kernel_ver_info >= (2, 6, 11): + self.assertIn('steal', fields) + else: + self.assertNotIn('steal', fields) + if kernel_ver_info >= (2, 6, 24): + self.assertIn('guest', fields) + else: + self.assertNotIn('guest', fields) + if kernel_ver_info >= (3, 2, 0): + self.assertIn('guest_nice', fields) + else: + self.assertNotIn('guest_nice', fields) + + def test_net_if_addrs_ips(self): + for name, addrs in psutil.net_if_addrs().items(): + for addr in addrs: + if addr.family == psutil.AF_LINK: + self.assertEqual(addr.address, get_mac_address(name)) + elif addr.family == socket.AF_INET: + self.assertEqual(addr.address, get_ipv4_address(name)) + # TODO: test for AF_INET6 family + + @unittest.skipUnless(which('ip'), "'ip' utility not available") + @unittest.skipIf(TRAVIS, "skipped on Travis") + def test_net_if_names(self): + out = sh("ip addr").strip() + nics = psutil.net_if_addrs() + found = 0 + for line in out.split('\n'): + line = line.strip() + if re.search("^\d+:", line): + found += 1 + name = line.split(':')[1].strip() + self.assertIn(name, nics.keys()) + self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( + pprint.pformat(nics), out)) + + @unittest.skipUnless(which("nproc"), "nproc utility not available") + def test_cpu_count_logical_w_nproc(self): + num = int(sh("nproc --all")) + self.assertEqual(psutil.cpu_count(logical=True), num) + + @unittest.skipUnless(which("lscpu"), "lscpu utility not available") + def test_cpu_count_logical_w_lscpu(self): + out = sh("lscpu -p") + num = len([x for x in out.split('\n') if not x.startswith('#')]) + self.assertEqual(psutil.cpu_count(logical=True), num) + + # --- mocked tests + + def test_virtual_memory_mocked_warnings(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + with warnings.catch_warnings(record=True) as ws: + warnings.simplefilter("always") + ret = psutil._pslinux.virtual_memory() + assert m.called + self.assertEqual(len(ws), 1) + w = ws[0] + self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) + self.assertIn( + "'cached', 'active' and 'inactive' memory stats couldn't " + "be determined", str(w.message)) + self.assertEqual(ret.cached, 0) + self.assertEqual(ret.active, 0) + self.assertEqual(ret.inactive, 0) + + def test_swap_memory_mocked_warnings(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + with warnings.catch_warnings(record=True) as ws: + warnings.simplefilter("always") + ret = psutil._pslinux.swap_memory() + assert m.called + self.assertEqual(len(ws), 1) + w = ws[0] + self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) + self.assertIn( + "'sin' and 'sout' swap memory stats couldn't " + "be determined", str(w.message)) + self.assertEqual(ret.sin, 0) + self.assertEqual(ret.sout, 0) + + def test_cpu_count_logical_mocked(self): + import psutil._pslinux + original = psutil._pslinux.cpu_count_logical() + # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in + # order to cause the parsing of /proc/cpuinfo and /proc/stat. + with mock.patch( + 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: + self.assertEqual(psutil._pslinux.cpu_count_logical(), original) + assert m.called + + # Let's have open() return emtpy data and make sure None is + # returned ('cause we mimick os.cpu_count()). + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertIsNone(psutil._pslinux.cpu_count_logical()) + self.assertEqual(m.call_count, 2) + # /proc/stat should be the last one + self.assertEqual(m.call_args[0][0], '/proc/stat') + + # Let's push this a bit further and make sure /proc/cpuinfo + # parsing works as expected. + with open('/proc/cpuinfo', 'rb') as f: + cpuinfo_data = f.read() + fake_file = io.BytesIO(cpuinfo_data) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + self.assertEqual(psutil._pslinux.cpu_count_logical(), original) + + def test_cpu_count_physical_mocked(self): + # Have open() return emtpy data and make sure None is returned + # ('cause we want to mimick os.cpu_count()) + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertIsNone(psutil._pslinux.cpu_count_physical()) + assert m.called + + def test_proc_open_files_file_gone(self): + # simulates a file which gets deleted during open_files() + # execution + p = psutil.Process() + files = p.open_files() + with tempfile.NamedTemporaryFile(): + # give the kernel some time to see the new file + call_until(p.open_files, "len(ret) != %i" % len(files)) + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.ENOENT, "")) as m: + files = p.open_files() + assert not files + assert m.called + # also simulate the case where os.readlink() returns EINVAL + # in which case psutil is supposed to 'continue' + with mock.patch('psutil._pslinux.os.readlink', + side_effect=OSError(errno.EINVAL, "")) as m: + self.assertEqual(p.open_files(), []) + assert m.called + + def test_proc_terminal_mocked(self): + with mock.patch('psutil._pslinux._psposix._get_terminal_map', + return_value={}) as m: + self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) + assert m.called + + def test_proc_num_ctx_switches_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_ctx_switches) + assert m.called + + def test_proc_num_threads_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).num_threads) + assert m.called + + def test_proc_ppid_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).ppid) + assert m.called + + def test_proc_uids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).uids) + assert m.called + + def test_proc_gids_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).gids) + assert m.called + + def test_proc_cmdline_mocked(self): + # see: https://github.com/giampaolo/psutil/issues/639 + p = psutil.Process() + fake_file = io.StringIO(u('foo\x00bar\x00')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + p.cmdline() == ['foo', 'bar'] + assert m.called + fake_file = io.StringIO(u('foo\x00bar\x00\x00')) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m: + p.cmdline() == ['foo', 'bar', ''] + assert m.called + + def test_proc_io_counters_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + NotImplementedError, + psutil._pslinux.Process(os.getpid()).io_counters) + assert m.called + + def test_boot_time_mocked(self): + with mock.patch('psutil._pslinux.open', create=True) as m: + self.assertRaises( + RuntimeError, + psutil._pslinux.boot_time) + assert m.called + + def test_users_mocked(self): + # Make sure ':0' and ':0.0' (returned by C ext) are converted + # to 'localhost'. + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', ':0', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'localhost') + assert m.called + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', ':0.0', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'localhost') + assert m.called + # ...otherwise it should be returned as-is + with mock.patch('psutil._pslinux.cext.users', + return_value=[('giampaolo', 'pts/2', 'foo', + 1436573184.0, True)]) as m: + self.assertEqual(psutil.users()[0].host, 'foo') + assert m.called + + def test_disk_partitions_mocked(self): + # Test that ZFS partitions are returned. + with open("/proc/filesystems", "r") as f: + data = f.read() + if 'zfs' in data: + for part in psutil.disk_partitions(): + if part.fstype == 'zfs': + break + else: + self.fail("couldn't find any ZFS partition") + else: + # No ZFS partitions on this system. Let's fake one. + fake_file = io.StringIO(u("nodev\tzfs\n")) + with mock.patch('psutil._pslinux.open', + return_value=fake_file, create=True) as m1: + with mock.patch( + 'psutil._pslinux.cext.disk_partitions', + return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: + ret = psutil.disk_partitions() + assert m1.called + assert m2.called + assert ret + self.assertEqual(ret[0].fstype, 'zfs') + + # --- tests for specific kernel versions + + @unittest.skipUnless( + get_kernel_version() >= (2, 6, 36), + "prlimit() not available on this Linux kernel version") + def test_prlimit_availability(self): + # prlimit() should be available starting from kernel 2.6.36 + p = psutil.Process(os.getpid()) + p.rlimit(psutil.RLIMIT_NOFILE) + # if prlimit() is supported *at least* these constants should + # be available + self.assertTrue(hasattr(psutil, "RLIM_INFINITY")) + self.assertTrue(hasattr(psutil, "RLIMIT_AS")) + self.assertTrue(hasattr(psutil, "RLIMIT_CORE")) + self.assertTrue(hasattr(psutil, "RLIMIT_CPU")) + self.assertTrue(hasattr(psutil, "RLIMIT_DATA")) + self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE")) + self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS")) + self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK")) + self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE")) + self.assertTrue(hasattr(psutil, "RLIMIT_NPROC")) + self.assertTrue(hasattr(psutil, "RLIMIT_RSS")) + self.assertTrue(hasattr(psutil, "RLIMIT_STACK")) + + @unittest.skipUnless( + get_kernel_version() >= (3, 0), + "prlimit constants not available on this Linux kernel version") + def test_resource_consts_kernel_v(self): + # more recent constants + self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE")) + self.assertTrue(hasattr(psutil, "RLIMIT_NICE")) + self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO")) + self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME")) + self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING")) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/_osx.py b/python/psutil/test/_osx.py new file mode 100644 index 000000000..6e6e4380e --- /dev/null +++ b/python/psutil/test/_osx.py @@ -0,0 +1,160 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""OSX specific tests. These are implicitly run by test_psutil.py.""" + +import os +import re +import subprocess +import sys +import time + +import psutil + +from psutil._compat import PY3 +from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess, + reap_children, retry_before_failing, unittest) + + +PAGESIZE = os.sysconf("SC_PAGE_SIZE") + + +def sysctl(cmdline): + """Expects a sysctl command with an argument and parse the result + returning only the value of interest. + """ + p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) + result = p.communicate()[0].strip().split()[1] + if PY3: + result = str(result, sys.stdout.encoding) + try: + return int(result) + except ValueError: + return result + + +def vm_stat(field): + """Wrapper around 'vm_stat' cmdline utility.""" + out = sh('vm_stat') + for line in out.split('\n'): + if field in line: + break + else: + raise ValueError("line not found") + return int(re.search('\d+', line).group(0)) * PAGESIZE + + +@unittest.skipUnless(OSX, "not an OSX system") +class OSXSpecificTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_process_create_time(self): + cmdline = "ps -o lstart -p %s" % self.pid + p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0] + if PY3: + output = str(output, sys.stdout.encoding) + start_ps = output.replace('STARTED', '').strip() + start_psutil = psutil.Process(self.pid).create_time() + start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", + time.localtime(start_psutil)) + self.assertEqual(start_ps, start_psutil) + + def test_disks(self): + # test psutil.disk_usage() and psutil.disk_partitions() + # against "df -a" + def df(path): + out = sh('df -k "%s"' % path).strip() + lines = out.split('\n') + lines.pop(0) + line = lines.pop(0) + dev, total, used, free = line.split()[:4] + if dev == 'none': + dev = '' + total = int(total) * 1024 + used = int(used) * 1024 + free = int(free) * 1024 + return dev, total, used, free + + for part in psutil.disk_partitions(all=False): + usage = psutil.disk_usage(part.mountpoint) + dev, total, used, free = df(part.mountpoint) + self.assertEqual(part.device, dev) + self.assertEqual(usage.total, total) + # 10 MB tollerance + if abs(usage.free - free) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % usage.free, free) + if abs(usage.used - used) > 10 * 1024 * 1024: + self.fail("psutil=%s, df=%s" % usage.used, used) + + # --- virtual mem + + def test_vmem_total(self): + sysctl_hwphymem = sysctl('sysctl hw.memsize') + self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total) + + @retry_before_failing() + def test_vmem_free(self): + num = vm_stat("free") + self.assertAlmostEqual(psutil.virtual_memory().free, num, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_active(self): + num = vm_stat("active") + self.assertAlmostEqual(psutil.virtual_memory().active, num, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_inactive(self): + num = vm_stat("inactive") + self.assertAlmostEqual(psutil.virtual_memory().inactive, num, + delta=TOLERANCE) + + @retry_before_failing() + def test_vmem_wired(self): + num = vm_stat("wired") + self.assertAlmostEqual(psutil.virtual_memory().wired, num, + delta=TOLERANCE) + + # --- swap mem + + def test_swapmem_sin(self): + num = vm_stat("Pageins") + self.assertEqual(psutil.swap_memory().sin, num) + + def test_swapmem_sout(self): + num = vm_stat("Pageouts") + self.assertEqual(psutil.swap_memory().sout, num) + + def test_swapmem_total(self): + tot1 = psutil.swap_memory().total + tot2 = 0 + # OSX uses multiple cache files: + # http://en.wikipedia.org/wiki/Paging#OS_X + for name in os.listdir("/var/vm/"): + file = os.path.join("/var/vm", name) + if os.path.isfile(file): + tot2 += os.path.getsize(file) + self.assertEqual(tot1, tot2) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/_posix.py b/python/psutil/test/_posix.py new file mode 100644 index 000000000..e6c56aac3 --- /dev/null +++ b/python/psutil/test/_posix.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""POSIX specific tests. These are implicitly run by test_psutil.py.""" + +import datetime +import os +import subprocess +import sys +import time + +import psutil + +from psutil._compat import PY3, callable +from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS +from test_psutil import (get_test_subprocess, skip_on_access_denied, + retry_before_failing, reap_children, sh, unittest, + get_kernel_version, wait_for_pid) + + +def ps(cmd): + """Expects a ps command with a -o argument and parse the result + returning only the value of interest. + """ + if not LINUX: + cmd = cmd.replace(" --no-headers ", " ") + if SUNOS: + cmd = cmd.replace("-o command", "-o comm") + cmd = cmd.replace("-o start", "-o stime") + p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + if not LINUX: + output = output.split('\n')[1].strip() + try: + return int(output) + except ValueError: + return output + + +@unittest.skipUnless(POSIX, "not a POSIX system") +class PosixSpecificTestCase(unittest.TestCase): + """Compare psutil results against 'ps' command line utility.""" + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess([PYTHON, "-E", "-O"], + stdin=subprocess.PIPE).pid + wait_for_pid(cls.pid) + + @classmethod + def tearDownClass(cls): + reap_children() + + # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps + + def test_process_parent_pid(self): + ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid) + ppid_psutil = psutil.Process(self.pid).ppid() + self.assertEqual(ppid_ps, ppid_psutil) + + def test_process_uid(self): + uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid) + uid_psutil = psutil.Process(self.pid).uids().real + self.assertEqual(uid_ps, uid_psutil) + + def test_process_gid(self): + gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid) + gid_psutil = psutil.Process(self.pid).gids().real + self.assertEqual(gid_ps, gid_psutil) + + def test_process_username(self): + username_ps = ps("ps --no-headers -o user -p %s" % self.pid) + username_psutil = psutil.Process(self.pid).username() + self.assertEqual(username_ps, username_psutil) + + @skip_on_access_denied() + @retry_before_failing() + def test_process_rss_memory(self): + # give python interpreter some time to properly initialize + # so that the results are the same + time.sleep(0.1) + rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid) + rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 + self.assertEqual(rss_ps, rss_psutil) + + @skip_on_access_denied() + @retry_before_failing() + def test_process_vsz_memory(self): + # give python interpreter some time to properly initialize + # so that the results are the same + time.sleep(0.1) + vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid) + vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 + self.assertEqual(vsz_ps, vsz_psutil) + + def test_process_name(self): + # use command + arg since "comm" keyword not supported on all platforms + name_ps = ps("ps --no-headers -o command -p %s" % ( + self.pid)).split(' ')[0] + # remove path if there is any, from the command + name_ps = os.path.basename(name_ps).lower() + name_psutil = psutil.Process(self.pid).name().lower() + self.assertEqual(name_ps, name_psutil) + + @unittest.skipIf(OSX or BSD, + 'ps -o start not available') + def test_process_create_time(self): + time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0] + time_psutil = psutil.Process(self.pid).create_time() + time_psutil_tstamp = datetime.datetime.fromtimestamp( + time_psutil).strftime("%H:%M:%S") + # sometimes ps shows the time rounded up instead of down, so we check + # for both possible values + round_time_psutil = round(time_psutil) + round_time_psutil_tstamp = datetime.datetime.fromtimestamp( + round_time_psutil).strftime("%H:%M:%S") + self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) + + def test_process_exe(self): + ps_pathname = ps("ps --no-headers -o command -p %s" % + self.pid).split(' ')[0] + psutil_pathname = psutil.Process(self.pid).exe() + try: + self.assertEqual(ps_pathname, psutil_pathname) + except AssertionError: + # certain platforms such as BSD are more accurate returning: + # "/usr/local/bin/python2.7" + # ...instead of: + # "/usr/local/bin/python" + # We do not want to consider this difference in accuracy + # an error. + adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] + self.assertEqual(ps_pathname, adjusted_ps_pathname) + + def test_process_cmdline(self): + ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) + psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) + if SUNOS: + # ps on Solaris only shows the first part of the cmdline + psutil_cmdline = psutil_cmdline.split(" ")[0] + self.assertEqual(ps_cmdline, psutil_cmdline) + + @retry_before_failing() + def test_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + if SUNOS: + cmd = ["ps", "ax"] + else: + cmd = ["ps", "ax", "-o", "pid"] + p = get_test_subprocess(cmd, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + pids_ps = [] + for line in output.split('\n')[1:]: + if line: + pid = int(line.split()[0].strip()) + pids_ps.append(pid) + # remove ps subprocess pid which is supposed to be dead in meantime + pids_ps.remove(p.pid) + pids_psutil = psutil.pids() + pids_ps.sort() + pids_psutil.sort() + + # on OSX ps doesn't show pid 0 + if OSX and 0 not in pids_ps: + pids_ps.insert(0, 0) + + if pids_ps != pids_psutil: + difference = [x for x in pids_psutil if x not in pids_ps] + \ + [x for x in pids_ps if x not in pids_psutil] + self.fail("difference: " + str(difference)) + + # for some reason ifconfig -a does not report all interfaces + # returned by psutil + @unittest.skipIf(SUNOS, "test not reliable on SUNOS") + @unittest.skipIf(TRAVIS, "test not reliable on Travis") + def test_nic_names(self): + p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) + output = p.communicate()[0].strip() + if PY3: + output = str(output, sys.stdout.encoding) + for nic in psutil.net_io_counters(pernic=True).keys(): + for line in output.split(): + if line.startswith(nic): + break + else: + self.fail( + "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( + nic, output)) + + @retry_before_failing() + def test_users(self): + out = sh("who") + lines = out.split('\n') + users = [x.split()[0] for x in lines] + self.assertEqual(len(users), len(psutil.users())) + terminals = [x.split()[1] for x in lines] + for u in psutil.users(): + self.assertTrue(u.name in users, u.name) + self.assertTrue(u.terminal in terminals, u.terminal) + + def test_fds_open(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + args = () + attr = getattr(p, name, None) + if attr is not None and callable(attr): + if name == 'rlimit': + args = (psutil.RLIMIT_NOFILE,) + attr(*args) + else: + attr + + p = psutil.Process(os.getpid()) + failures = [] + ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', + 'send_signal', 'wait', 'children', 'as_dict'] + if LINUX and get_kernel_version() < (2, 6, 36): + ignored_names.append('rlimit') + if LINUX and get_kernel_version() < (2, 6, 23): + ignored_names.append('num_ctx_switches') + for name in dir(psutil.Process): + if (name.startswith('_') or name in ignored_names): + continue + else: + try: + num1 = p.num_fds() + for x in range(2): + call(p, name) + num2 = p.num_fds() + except psutil.AccessDenied: + pass + else: + if abs(num2 - num1) > 1: + fail = "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/_sunos.py b/python/psutil/test/_sunos.py new file mode 100644 index 000000000..3d54ccd8c --- /dev/null +++ b/python/psutil/test/_sunos.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Sun OS specific tests. These are implicitly run by test_psutil.py.""" + +import sys +import os + +from test_psutil import SUNOS, sh, unittest +import psutil + + +@unittest.skipUnless(SUNOS, "not a SunOS system") +class SunOSSpecificTestCase(unittest.TestCase): + + def test_swap_memory(self): + out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH']) + lines = out.strip().split('\n')[1:] + if not lines: + raise ValueError('no swap device(s) configured') + total = free = 0 + for line in lines: + line = line.split() + t, f = line[-2:] + t = t.replace('K', '') + f = f.replace('K', '') + total += int(int(t) * 1024) + free += int(int(f) * 1024) + used = total - free + + psutil_swap = psutil.swap_memory() + self.assertEqual(psutil_swap.total, total) + self.assertEqual(psutil_swap.used, used) + self.assertEqual(psutil_swap.free, free) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/_windows.py b/python/psutil/test/_windows.py new file mode 100644 index 000000000..b7477bfeb --- /dev/null +++ b/python/psutil/test/_windows.py @@ -0,0 +1,464 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +"""Windows specific tests. These are implicitly run by test_psutil.py.""" + +import errno +import os +import platform +import signal +import subprocess +import sys +import time +import traceback + +from test_psutil import APPVEYOR, WINDOWS +from test_psutil import get_test_subprocess, reap_children, unittest + +import mock +try: + import wmi +except ImportError: + wmi = None +try: + import win32api + import win32con +except ImportError: + win32api = win32con = None + +from psutil._compat import PY3, callable, long +import psutil + + +cext = psutil._psplatform.cext + + +def wrap_exceptions(fun): + def wrapper(self, *args, **kwargs): + try: + return fun(self, *args, **kwargs) + except OSError as err: + from psutil._pswindows import ACCESS_DENIED_SET + if err.errno in ACCESS_DENIED_SET: + raise psutil.AccessDenied(None, None) + if err.errno == errno.ESRCH: + raise psutil.NoSuchProcess(None, None) + raise + return wrapper + + +@unittest.skipUnless(WINDOWS, "not a Windows system") +class WindowsSpecificTestCase(unittest.TestCase): + + @classmethod + def setUpClass(cls): + cls.pid = get_test_subprocess().pid + + @classmethod + def tearDownClass(cls): + reap_children() + + def test_issue_24(self): + p = psutil.Process(0) + self.assertRaises(psutil.AccessDenied, p.kill) + + def test_special_pid(self): + p = psutil.Process(4) + self.assertEqual(p.name(), 'System') + # use __str__ to access all common Process properties to check + # that nothing strange happens + str(p) + p.username() + self.assertTrue(p.create_time() >= 0.0) + try: + rss, vms = p.memory_info() + except psutil.AccessDenied: + # expected on Windows Vista and Windows 7 + if not platform.uname()[1] in ('vista', 'win-7', 'win7'): + raise + else: + self.assertTrue(rss > 0) + + def test_send_signal(self): + p = psutil.Process(self.pid) + self.assertRaises(ValueError, p.send_signal, signal.SIGINT) + + def test_nic_names(self): + p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) + out = p.communicate()[0] + if PY3: + out = str(out, sys.stdout.encoding) + nics = psutil.net_io_counters(pernic=True).keys() + for nic in nics: + if "pseudo-interface" in nic.replace(' ', '-').lower(): + continue + if nic not in out: + self.fail( + "%r nic wasn't found in 'ipconfig /all' output" % nic) + + def test_exe(self): + for p in psutil.process_iter(): + try: + self.assertEqual(os.path.basename(p.exe()), p.name()) + except psutil.Error: + pass + + # --- Process class tests + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_name(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(p.name(), w.Caption) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_exe(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + # Note: wmi reports the exe as a lower case string. + # Being Windows paths case-insensitive we ignore that. + self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_cmdline(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + self.assertEqual(' '.join(p.cmdline()), + w.CommandLine.replace('"', '')) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_username(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + domain, _, username = w.GetOwner() + username = "%s\\%s" % (domain, username) + self.assertEqual(p.username(), username) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_rss_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + rss = p.memory_info().rss + self.assertEqual(rss, int(w.WorkingSetSize)) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_vms_memory(self): + time.sleep(0.1) + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + vms = p.memory_info().vms + # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx + # ...claims that PageFileUsage is represented in Kilo + # bytes but funnily enough on certain platforms bytes are + # returned instead. + wmi_usage = int(w.PageFileUsage) + if (vms != wmi_usage) and (vms != wmi_usage * 1024): + self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_process_create_time(self): + w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + p = psutil.Process(self.pid) + wmic_create = str(w.CreationDate.split('.')[0]) + psutil_create = time.strftime("%Y%m%d%H%M%S", + time.localtime(p.create_time())) + self.assertEqual(wmic_create, psutil_create) + + # --- psutil namespace functions and constants tests + + @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ, + 'NUMBER_OF_PROCESSORS env var is not available') + def test_cpu_count(self): + num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) + self.assertEqual(num_cpus, psutil.cpu_count()) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_total_phymem(self): + w = wmi.WMI().Win32_ComputerSystem()[0] + self.assertEqual(int(w.TotalPhysicalMemory), + psutil.virtual_memory().total) + + # @unittest.skipIf(wmi is None, "wmi module is not installed") + # def test__UPTIME(self): + # # _UPTIME constant is not public but it is used internally + # # as value to return for pid 0 creation time. + # # WMI behaves the same. + # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] + # p = psutil.Process(0) + # wmic_create = str(w.CreationDate.split('.')[0]) + # psutil_create = time.strftime("%Y%m%d%H%M%S", + # time.localtime(p.create_time())) + # + + # Note: this test is not very reliable + @unittest.skipIf(wmi is None, "wmi module is not installed") + @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") + def test_pids(self): + # Note: this test might fail if the OS is starting/killing + # other processes in the meantime + w = wmi.WMI().Win32_Process() + wmi_pids = set([x.ProcessId for x in w]) + psutil_pids = set(psutil.pids()) + self.assertEqual(wmi_pids, psutil_pids) + + @unittest.skipIf(wmi is None, "wmi module is not installed") + def test_disks(self): + ps_parts = psutil.disk_partitions(all=True) + wmi_parts = wmi.WMI().Win32_LogicalDisk() + for ps_part in ps_parts: + for wmi_part in wmi_parts: + if ps_part.device.replace('\\', '') == wmi_part.DeviceID: + if not ps_part.mountpoint: + # this is usually a CD-ROM with no disk inserted + break + try: + usage = psutil.disk_usage(ps_part.mountpoint) + except OSError as err: + if err.errno == errno.ENOENT: + # usually this is the floppy + break + else: + raise + self.assertEqual(usage.total, int(wmi_part.Size)) + wmi_free = int(wmi_part.FreeSpace) + self.assertEqual(usage.free, wmi_free) + # 10 MB tollerance + if abs(usage.free - wmi_free) > 10 * 1024 * 1024: + self.fail("psutil=%s, wmi=%s" % ( + usage.free, wmi_free)) + break + else: + self.fail("can't find partition %s" % repr(ps_part)) + + @unittest.skipIf(win32api is None, "pywin32 module is not installed") + def test_num_handles(self): + p = psutil.Process(os.getpid()) + before = p.num_handles() + handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, + win32con.FALSE, os.getpid()) + after = p.num_handles() + self.assertEqual(after, before + 1) + win32api.CloseHandle(handle) + self.assertEqual(p.num_handles(), before) + + @unittest.skipIf(win32api is None, "pywin32 module is not installed") + def test_num_handles_2(self): + # Note: this fails from time to time; I'm keen on thinking + # it doesn't mean something is broken + def call(p, attr): + attr = getattr(p, name, None) + if attr is not None and callable(attr): + attr() + else: + attr + + p = psutil.Process(self.pid) + failures = [] + for name in dir(psutil.Process): + if name.startswith('_') \ + or name in ('terminate', 'kill', 'suspend', 'resume', + 'nice', 'send_signal', 'wait', 'children', + 'as_dict'): + continue + else: + try: + call(p, name) + num1 = p.num_handles() + call(p, name) + num2 = p.num_handles() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + if num2 > num1: + fail = \ + "failure while processing Process.%s method " \ + "(before=%s, after=%s)" % (name, num1, num2) + failures.append(fail) + if failures: + self.fail('\n' + '\n'.join(failures)) + + def test_name_always_available(self): + # On Windows name() is never supposed to raise AccessDenied, + # see https://github.com/giampaolo/psutil/issues/627 + for p in psutil.process_iter(): + try: + p.name() + except psutil.NoSuchProcess(): + pass + + +@unittest.skipUnless(WINDOWS, "not a Windows system") +class TestDualProcessImplementation(unittest.TestCase): + """ + Certain APIs on Windows have 2 internal implementations, one + based on documented Windows APIs, another one based + NtQuerySystemInformation() which gets called as fallback in + case the first fails because of limited permission error. + Here we test that the two methods return the exact same value, + see: + https://github.com/giampaolo/psutil/issues/304 + """ + + fun_names = [ + # function name, tolerance + ('proc_cpu_times', 0.2), + ('proc_create_time', 0.5), + ('proc_num_handles', 1), # 1 because impl #1 opens a handle + ('proc_memory_info', 1024), # KB + ('proc_io_counters', 0), + ] + + def test_compare_values(self): + def assert_ge_0(obj): + if isinstance(obj, tuple): + for value in obj: + self.assertGreaterEqual(value, 0, msg=obj) + elif isinstance(obj, (int, long, float)): + self.assertGreaterEqual(obj, 0) + else: + assert 0 # case not handled which needs to be fixed + + def compare_with_tolerance(ret1, ret2, tolerance): + if ret1 == ret2: + return + else: + if isinstance(ret2, (int, long, float)): + diff = abs(ret1 - ret2) + self.assertLessEqual(diff, tolerance) + elif isinstance(ret2, tuple): + for a, b in zip(ret1, ret2): + diff = abs(a - b) + self.assertLessEqual(diff, tolerance) + + from psutil._pswindows import ntpinfo + failures = [] + for p in psutil.process_iter(): + try: + nt = ntpinfo(*cext.proc_info(p.pid)) + except psutil.NoSuchProcess: + continue + assert_ge_0(nt) + + for name, tolerance in self.fun_names: + if name == 'proc_memory_info' and p.pid == os.getpid(): + continue + if name == 'proc_create_time' and p.pid in (0, 4): + continue + meth = wrap_exceptions(getattr(cext, name)) + try: + ret = meth(p.pid) + except (psutil.NoSuchProcess, psutil.AccessDenied): + continue + # compare values + try: + if name == 'proc_cpu_times': + compare_with_tolerance(ret[0], nt.user_time, tolerance) + compare_with_tolerance(ret[1], + nt.kernel_time, tolerance) + elif name == 'proc_create_time': + compare_with_tolerance(ret, nt.create_time, tolerance) + elif name == 'proc_num_handles': + compare_with_tolerance(ret, nt.num_handles, tolerance) + elif name == 'proc_io_counters': + compare_with_tolerance(ret[0], nt.io_rcount, tolerance) + compare_with_tolerance(ret[1], nt.io_wcount, tolerance) + compare_with_tolerance(ret[2], nt.io_rbytes, tolerance) + compare_with_tolerance(ret[3], nt.io_wbytes, tolerance) + elif name == 'proc_memory_info': + try: + rawtupl = cext.proc_memory_info_2(p.pid) + except psutil.NoSuchProcess: + continue + compare_with_tolerance(ret, rawtupl, tolerance) + except AssertionError: + trace = traceback.format_exc() + msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % ( + trace, p.pid, name, ret, nt) + failures.append(msg) + break + + if failures: + self.fail('\n\n'.join(failures)) + + # --- + # same tests as above but mimicks the AccessDenied failure of + # the first (fast) method failing with AD. + # TODO: currently does not take tolerance into account. + + def test_name(self): + name = psutil.Process().name() + with mock.patch("psutil._psplatform.cext.proc_exe", + side_effect=psutil.AccessDenied(os.getpid())) as fun: + psutil.Process().name() == name + assert fun.called + + def test_memory_info(self): + mem = psutil.Process().memory_info() + with mock.patch("psutil._psplatform.cext.proc_memory_info", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().memory_info() == mem + assert fun.called + + def test_create_time(self): + ctime = psutil.Process().create_time() + with mock.patch("psutil._psplatform.cext.proc_create_time", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().create_time() == ctime + assert fun.called + + def test_cpu_times(self): + cpu_times = psutil.Process().cpu_times() + with mock.patch("psutil._psplatform.cext.proc_cpu_times", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().cpu_times() == cpu_times + assert fun.called + + def test_io_counters(self): + io_counters = psutil.Process().io_counters() + with mock.patch("psutil._psplatform.cext.proc_io_counters", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().io_counters() == io_counters + assert fun.called + + def test_num_handles(self): + io_counters = psutil.Process().io_counters() + with mock.patch("psutil._psplatform.cext.proc_io_counters", + side_effect=OSError(errno.EPERM, "msg")) as fun: + psutil.Process().io_counters() == io_counters + assert fun.called + + # --- other tests + + def test_compare_name_exe(self): + for p in psutil.process_iter(): + try: + a = os.path.basename(p.exe()) + b = p.name() + except (psutil.NoSuchProcess, psutil.AccessDenied): + pass + else: + self.assertEqual(a, b) + + def test_zombies(self): + # test that NPS is raised by the 2nd implementation in case a + # process no longer exists + ZOMBIE_PID = max(psutil.pids()) + 5000 + for name, _ in self.fun_names: + meth = wrap_exceptions(getattr(cext, name)) + self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) + + +def main(): + test_suite = unittest.TestSuite() + test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) + test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/test_memory_leaks.py b/python/psutil/test/test_memory_leaks.py new file mode 100644 index 000000000..6f02dc0ac --- /dev/null +++ b/python/psutil/test/test_memory_leaks.py @@ -0,0 +1,445 @@ +#!/usr/bin/env python + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +A test script which attempts to detect memory leaks by calling C +functions many times and compare process memory usage before and +after the calls. It might produce false positives. +""" + +import functools +import gc +import os +import socket +import sys +import threading +import time + +import psutil +import psutil._common + +from psutil._compat import xrange, callable +from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN, + RLIMIT_SUPPORT, TRAVIS) +from test_psutil import (reap_children, supports_ipv6, safe_remove, + get_test_subprocess) + +if sys.version_info < (2, 7): + import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 +else: + import unittest + + +LOOPS = 1000 +TOLERANCE = 4096 +SKIP_PYTHON_IMPL = True + + +def skip_if_linux(): + return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL, + "not worth being tested on LINUX (pure python)") + + +class Base(unittest.TestCase): + proc = psutil.Process() + + def execute(self, function, *args, **kwargs): + def call_many_times(): + for x in xrange(LOOPS - 1): + self.call(function, *args, **kwargs) + del x + gc.collect() + return self.get_mem() + + self.call(function, *args, **kwargs) + self.assertEqual(gc.garbage, []) + self.assertEqual(threading.active_count(), 1) + + # RSS comparison + # step 1 + rss1 = call_many_times() + # step 2 + rss2 = call_many_times() + + difference = rss2 - rss1 + if difference > TOLERANCE: + # This doesn't necessarily mean we have a leak yet. + # At this point we assume that after having called the + # function so many times the memory usage is stabilized + # and if there are no leaks it should not increase any + # more. + # Let's keep calling fun for 3 more seconds and fail if + # we notice any difference. + stop_at = time.time() + 3 + while True: + self.call(function, *args, **kwargs) + if time.time() >= stop_at: + break + del stop_at + gc.collect() + rss3 = self.get_mem() + difference = rss3 - rss2 + if rss3 > rss2: + self.fail("rss2=%s, rss3=%s, difference=%s" + % (rss2, rss3, difference)) + + def execute_w_exc(self, exc, function, *args, **kwargs): + kwargs['_exc'] = exc + self.execute(function, *args, **kwargs) + + def get_mem(self): + return psutil.Process().memory_info()[0] + + def call(self, function, *args, **kwargs): + raise NotImplementedError("must be implemented in subclass") + + +class TestProcessObjectLeaks(Base): + """Test leaks of Process class methods and properties""" + + def setUp(self): + gc.collect() + + def tearDown(self): + reap_children() + + def call(self, function, *args, **kwargs): + if callable(function): + if '_exc' in kwargs: + exc = kwargs.pop('_exc') + self.assertRaises(exc, function, *args, **kwargs) + else: + try: + function(*args, **kwargs) + except psutil.Error: + pass + else: + meth = getattr(self.proc, function) + if '_exc' in kwargs: + exc = kwargs.pop('_exc') + self.assertRaises(exc, meth, *args, **kwargs) + else: + try: + meth(*args, **kwargs) + except psutil.Error: + pass + + @skip_if_linux() + def test_name(self): + self.execute('name') + + @skip_if_linux() + def test_cmdline(self): + self.execute('cmdline') + + @skip_if_linux() + def test_exe(self): + self.execute('exe') + + @skip_if_linux() + def test_ppid(self): + self.execute('ppid') + + @unittest.skipUnless(POSIX, "POSIX only") + @skip_if_linux() + def test_uids(self): + self.execute('uids') + + @unittest.skipUnless(POSIX, "POSIX only") + @skip_if_linux() + def test_gids(self): + self.execute('gids') + + @skip_if_linux() + def test_status(self): + self.execute('status') + + def test_nice_get(self): + self.execute('nice') + + def test_nice_set(self): + niceness = psutil.Process().nice() + self.execute('nice', niceness) + + @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), + "Linux and Windows Vista only") + def test_ionice_get(self): + self.execute('ionice') + + @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), + "Linux and Windows Vista only") + def test_ionice_set(self): + if WINDOWS: + value = psutil.Process().ionice() + self.execute('ionice', value) + else: + from psutil._pslinux import cext + self.execute('ionice', psutil.IOPRIO_CLASS_NONE) + fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) + self.execute_w_exc(OSError, fun) + + @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform") + @skip_if_linux() + def test_io_counters(self): + self.execute('io_counters') + + @unittest.skipUnless(WINDOWS, "not worth being tested on posix") + def test_username(self): + self.execute('username') + + @skip_if_linux() + def test_create_time(self): + self.execute('create_time') + + @skip_if_linux() + def test_num_threads(self): + self.execute('num_threads') + + @unittest.skipUnless(WINDOWS, "Windows only") + def test_num_handles(self): + self.execute('num_handles') + + @unittest.skipUnless(POSIX, "POSIX only") + @skip_if_linux() + def test_num_fds(self): + self.execute('num_fds') + + @skip_if_linux() + def test_threads(self): + self.execute('threads') + + @skip_if_linux() + def test_cpu_times(self): + self.execute('cpu_times') + + @skip_if_linux() + def test_memory_info(self): + self.execute('memory_info') + + @skip_if_linux() + def test_memory_info_ex(self): + self.execute('memory_info_ex') + + @unittest.skipUnless(POSIX, "POSIX only") + @skip_if_linux() + def test_terminal(self): + self.execute('terminal') + + @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, + "not worth being tested on POSIX (pure python)") + def test_resume(self): + self.execute('resume') + + @skip_if_linux() + def test_cwd(self): + self.execute('cwd') + + @unittest.skipUnless(WINDOWS or LINUX or BSD, + "Windows or Linux or BSD only") + def test_cpu_affinity_get(self): + self.execute('cpu_affinity') + + @unittest.skipUnless(WINDOWS or LINUX or BSD, + "Windows or Linux or BSD only") + def test_cpu_affinity_set(self): + affinity = psutil.Process().cpu_affinity() + self.execute('cpu_affinity', affinity) + if not TRAVIS: + self.execute_w_exc(ValueError, 'cpu_affinity', [-1]) + + @skip_if_linux() + def test_open_files(self): + safe_remove(TESTFN) # needed after UNIX socket test has run + with open(TESTFN, 'w'): + self.execute('open_files') + + # OSX implementation is unbelievably slow + @unittest.skipIf(OSX, "OSX implementation is too slow") + @skip_if_linux() + def test_memory_maps(self): + self.execute('memory_maps') + + @unittest.skipUnless(LINUX, "Linux only") + @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, + "only available on Linux >= 2.6.36") + def test_rlimit_get(self): + self.execute('rlimit', psutil.RLIMIT_NOFILE) + + @unittest.skipUnless(LINUX, "Linux only") + @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, + "only available on Linux >= 2.6.36") + def test_rlimit_set(self): + limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE) + self.execute('rlimit', psutil.RLIMIT_NOFILE, limit) + self.execute_w_exc(OSError, 'rlimit', -1) + + @skip_if_linux() + # Windows implementation is based on a single system-wide function + @unittest.skipIf(WINDOWS, "tested later") + def test_connections(self): + def create_socket(family, type): + sock = socket.socket(family, type) + sock.bind(('', 0)) + if type == socket.SOCK_STREAM: + sock.listen(1) + return sock + + socks = [] + socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM)) + socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM)) + if supports_ipv6(): + socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM)) + socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM)) + if hasattr(socket, 'AF_UNIX'): + safe_remove(TESTFN) + s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + s.bind(TESTFN) + s.listen(1) + socks.append(s) + kind = 'all' + # TODO: UNIX sockets are temporarily implemented by parsing + # 'pfiles' cmd output; we don't want that part of the code to + # be executed. + if SUNOS: + kind = 'inet' + try: + self.execute('connections', kind=kind) + finally: + for s in socks: + s.close() + + +p = get_test_subprocess() +DEAD_PROC = psutil.Process(p.pid) +DEAD_PROC.kill() +DEAD_PROC.wait() +del p + + +class TestProcessObjectLeaksZombie(TestProcessObjectLeaks): + """Same as above but looks for leaks occurring when dealing with + zombie processes raising NoSuchProcess exception. + """ + proc = DEAD_PROC + + def call(self, *args, **kwargs): + try: + TestProcessObjectLeaks.call(self, *args, **kwargs) + except psutil.NoSuchProcess: + pass + + if not POSIX: + def test_kill(self): + self.execute('kill') + + def test_terminate(self): + self.execute('terminate') + + def test_suspend(self): + self.execute('suspend') + + def test_resume(self): + self.execute('resume') + + def test_wait(self): + self.execute('wait') + + +class TestModuleFunctionsLeaks(Base): + """Test leaks of psutil module functions.""" + + def setUp(self): + gc.collect() + + def call(self, function, *args, **kwargs): + fun = getattr(psutil, function) + fun(*args, **kwargs) + + @skip_if_linux() + def test_cpu_count_logical(self): + psutil.cpu_count = psutil._psplatform.cpu_count_logical + self.execute('cpu_count') + + @skip_if_linux() + def test_cpu_count_physical(self): + psutil.cpu_count = psutil._psplatform.cpu_count_physical + self.execute('cpu_count') + + @skip_if_linux() + def test_boot_time(self): + self.execute('boot_time') + + @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, + "not worth being tested on POSIX (pure python)") + def test_pid_exists(self): + self.execute('pid_exists', os.getpid()) + + def test_virtual_memory(self): + self.execute('virtual_memory') + + # TODO: remove this skip when this gets fixed + @unittest.skipIf(SUNOS, + "not worth being tested on SUNOS (uses a subprocess)") + def test_swap_memory(self): + self.execute('swap_memory') + + @skip_if_linux() + def test_cpu_times(self): + self.execute('cpu_times') + + @skip_if_linux() + def test_per_cpu_times(self): + self.execute('cpu_times', percpu=True) + + @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, + "not worth being tested on POSIX (pure python)") + def test_disk_usage(self): + self.execute('disk_usage', '.') + + def test_disk_partitions(self): + self.execute('disk_partitions') + + @skip_if_linux() + def test_net_io_counters(self): + self.execute('net_io_counters') + + @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), + '/proc/diskstats not available on this Linux version') + @skip_if_linux() + def test_disk_io_counters(self): + self.execute('disk_io_counters') + + # XXX - on Windows this produces a false positive + @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows") + def test_users(self): + self.execute('users') + + @unittest.skipIf(LINUX, + "not worth being tested on Linux (pure python)") + def test_net_connections(self): + self.execute('net_connections') + + def test_net_if_addrs(self): + self.execute('net_if_addrs') + + @unittest.skipIf(TRAVIS, "EPERM on travis") + def test_net_if_stats(self): + self.execute('net_if_stats') + + +def main(): + test_suite = unittest.TestSuite() + tests = [TestProcessObjectLeaksZombie, + TestProcessObjectLeaks, + TestModuleFunctionsLeaks] + for test in tests: + test_suite.addTest(unittest.makeSuite(test)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) diff --git a/python/psutil/test/test_psutil.py b/python/psutil/test/test_psutil.py new file mode 100644 index 000000000..3b2e3587a --- /dev/null +++ b/python/psutil/test/test_psutil.py @@ -0,0 +1,3013 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. +# Use of this source code is governed by a BSD-style license that can be +# found in the LICENSE file. + +""" +psutil test suite. Run it with: +$ make test + +If you're on Python < 2.7 unittest2 module must be installed first: +https://pypi.python.org/pypi/unittest2 +""" + +from __future__ import division + +import ast +import atexit +import collections +import contextlib +import datetime +import errno +import functools +import imp +import json +import os +import pickle +import pprint +import re +import select +import shutil +import signal +import socket +import stat +import subprocess +import sys +import tempfile +import textwrap +import threading +import time +import traceback +import types +import warnings +from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM +try: + import ipaddress # python >= 3.3 +except ImportError: + ipaddress = None +try: + from unittest import mock # py3 +except ImportError: + import mock # requires "pip install mock" + +import psutil +from psutil._compat import PY3, callable, long, unicode + +if sys.version_info < (2, 7): + import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 +else: + import unittest +if sys.version_info >= (3, 4): + import enum +else: + enum = None + + +# =================================================================== +# --- Constants +# =================================================================== + +# conf for retry_before_failing() decorator +NO_RETRIES = 10 +# bytes tolerance for OS memory related tests +TOLERANCE = 500 * 1024 # 500KB +# the timeout used in functions which have to wait +GLOBAL_TIMEOUT = 3 + +AF_INET6 = getattr(socket, "AF_INET6") +AF_UNIX = getattr(socket, "AF_UNIX", None) +PYTHON = os.path.realpath(sys.executable) +DEVNULL = open(os.devnull, 'r+') +TESTFN = os.path.join(os.getcwd(), "$testfile") +TESTFN_UNICODE = TESTFN + "Æ’Å‘Å‘" +TESTFILE_PREFIX = 'psutil-test-suite-' +if not PY3: + try: + TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding()) + except UnicodeDecodeError: + TESTFN_UNICODE = TESTFN + "???" + +EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), + '..', 'examples')) + +POSIX = os.name == 'posix' +WINDOWS = os.name == 'nt' +if WINDOWS: + WIN_VISTA = (6, 0, 0) +LINUX = sys.platform.startswith("linux") +OSX = sys.platform.startswith("darwin") +BSD = sys.platform.startswith("freebsd") +SUNOS = sys.platform.startswith("sunos") +VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) + if x.startswith('STATUS_')] +# whether we're running this test suite on Travis (https://travis-ci.org/) +TRAVIS = bool(os.environ.get('TRAVIS')) +# whether we're running this test suite on Appveyor for Windows +# (http://www.appveyor.com/) +APPVEYOR = bool(os.environ.get('APPVEYOR')) + +if TRAVIS or 'tox' in sys.argv[0]: + import ipaddress +if TRAVIS or APPVEYOR: + GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4 + + +# =================================================================== +# --- Utility functions +# =================================================================== + +def cleanup(): + reap_children(search_all=True) + safe_remove(TESTFN) + try: + safe_rmdir(TESTFN_UNICODE) + except UnicodeEncodeError: + pass + for path in _testfiles: + safe_remove(path) + +atexit.register(cleanup) +atexit.register(lambda: DEVNULL.close()) + + +_subprocesses_started = set() + + +def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, + stdin=DEVNULL, wait=False): + """Return a subprocess.Popen object to use in tests. + By default stdout and stderr are redirected to /dev/null and the + python interpreter is used as test process. + If 'wait' is True attemps to make sure the process is in a + reasonably initialized state. + """ + if cmd is None: + pyline = "" + if wait: + pyline += "open(r'%s', 'w'); " % TESTFN + pyline += "import time; time.sleep(60);" + cmd_ = [PYTHON, "-c", pyline] + else: + cmd_ = cmd + sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) + if wait: + if cmd is None: + stop_at = time.time() + 3 + while stop_at > time.time(): + if os.path.exists(TESTFN): + break + time.sleep(0.001) + else: + warn("couldn't make sure test file was actually created") + else: + wait_for_pid(sproc.pid) + _subprocesses_started.add(psutil.Process(sproc.pid)) + return sproc + + +_testfiles = [] + + +def pyrun(src): + """Run python code 'src' in a separate interpreter. + Return interpreter subprocess. + """ + if PY3: + src = bytes(src, 'ascii') + with tempfile.NamedTemporaryFile( + prefix=TESTFILE_PREFIX, delete=False) as f: + _testfiles.append(f.name) + f.write(src) + f.flush() + subp = get_test_subprocess([PYTHON, f.name], stdout=None, + stderr=None) + wait_for_pid(subp.pid) + return subp + + +def warn(msg): + """Raise a warning msg.""" + warnings.warn(msg, UserWarning) + + +def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): + """run cmd in a subprocess and return its output. + raises RuntimeError on error. + """ + p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) + stdout, stderr = p.communicate() + if p.returncode != 0: + raise RuntimeError(stderr) + if stderr: + warn(stderr) + if PY3: + stdout = str(stdout, sys.stdout.encoding) + return stdout.strip() + + +def which(program): + """Same as UNIX which command. Return None on command not found.""" + def is_exe(fpath): + return os.path.isfile(fpath) and os.access(fpath, os.X_OK) + + fpath, fname = os.path.split(program) + if fpath: + if is_exe(program): + return program + else: + for path in os.environ["PATH"].split(os.pathsep): + exe_file = os.path.join(path, program) + if is_exe(exe_file): + return exe_file + return None + + +if POSIX: + def get_kernel_version(): + """Return a tuple such as (2, 6, 36).""" + s = "" + uname = os.uname()[2] + for c in uname: + if c.isdigit() or c == '.': + s += c + else: + break + if not s: + raise ValueError("can't parse %r" % uname) + minor = 0 + micro = 0 + nums = s.split('.') + major = int(nums[0]) + if len(nums) >= 2: + minor = int(nums[1]) + if len(nums) >= 3: + micro = int(nums[2]) + return (major, minor, micro) + + +if LINUX: + RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36) +else: + RLIMIT_SUPPORT = False + + +def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT): + """Wait for pid to show up in the process list then return. + Used in the test suite to give time the sub process to initialize. + """ + raise_at = time.time() + timeout + while True: + if pid in psutil.pids(): + # give it one more iteration to allow full initialization + time.sleep(0.01) + return + time.sleep(0.0001) + if time.time() >= raise_at: + raise RuntimeError("Timed out") + + +def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True): + """Wait for a file to be written on disk.""" + stop_at = time.time() + 3 + while time.time() < stop_at: + try: + with open(fname, "r") as f: + data = f.read() + if not data: + continue + if delete_file: + os.remove(fname) + return data + except IOError: + time.sleep(0.001) + raise RuntimeError("timed out (couldn't read file)") + + +def reap_children(search_all=False): + """Kill any subprocess started by this test suite and ensure that + no zombies stick around to hog resources and create problems when + looking for refleaks. + """ + global _subprocesses_started + procs = _subprocesses_started.copy() + if search_all: + this_process = psutil.Process() + for p in this_process.children(recursive=True): + procs.add(p) + for p in procs: + try: + p.terminate() + except psutil.NoSuchProcess: + pass + gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT) + for p in alive: + warn("couldn't terminate process %s" % p) + try: + p.kill() + except psutil.NoSuchProcess: + pass + _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) + if alive: + warn("couldn't not kill processes %s" % str(alive)) + _subprocesses_started = set(alive) + + +def check_ip_address(addr, family): + """Attempts to check IP address's validity.""" + if enum and PY3: + assert isinstance(family, enum.IntEnum), family + if family == AF_INET: + octs = [int(x) for x in addr.split('.')] + assert len(octs) == 4, addr + for num in octs: + assert 0 <= num <= 255, addr + if ipaddress: + if not PY3: + addr = unicode(addr) + ipaddress.IPv4Address(addr) + elif family == AF_INET6: + assert isinstance(addr, str), addr + if ipaddress: + if not PY3: + addr = unicode(addr) + ipaddress.IPv6Address(addr) + elif family == psutil.AF_LINK: + assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr + else: + raise ValueError("unknown family %r", family) + + +def check_connection_ntuple(conn): + """Check validity of a connection namedtuple.""" + valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if + x.startswith('CONN_')] + assert conn[0] == conn.fd + assert conn[1] == conn.family + assert conn[2] == conn.type + assert conn[3] == conn.laddr + assert conn[4] == conn.raddr + assert conn[5] == conn.status + assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type) + assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family) + assert conn.status in valid_conn_states, conn.status + + # check IP address and port sanity + for addr in (conn.laddr, conn.raddr): + if not addr: + continue + if conn.family in (AF_INET, AF_INET6): + assert isinstance(addr, tuple), addr + ip, port = addr + assert isinstance(port, int), port + assert 0 <= port <= 65535, port + check_ip_address(ip, conn.family) + elif conn.family == AF_UNIX: + assert isinstance(addr, (str, None)), addr + else: + raise ValueError("unknown family %r", conn.family) + + if conn.family in (AF_INET, AF_INET6): + # actually try to bind the local socket; ignore IPv6 + # sockets as their address might be represented as + # an IPv4-mapped-address (e.g. "::127.0.0.1") + # and that's rejected by bind() + if conn.family == AF_INET: + s = socket.socket(conn.family, conn.type) + with contextlib.closing(s): + try: + s.bind((conn.laddr[0], 0)) + except socket.error as err: + if err.errno != errno.EADDRNOTAVAIL: + raise + elif conn.family == AF_UNIX: + assert not conn.raddr, repr(conn.raddr) + assert conn.status == psutil.CONN_NONE, conn.status + + if getattr(conn, 'fd', -1) != -1: + assert conn.fd > 0, conn + if hasattr(socket, 'fromfd') and not WINDOWS: + try: + dupsock = socket.fromfd(conn.fd, conn.family, conn.type) + except (socket.error, OSError) as err: + if err.args[0] != errno.EBADF: + raise + else: + with contextlib.closing(dupsock): + assert dupsock.family == conn.family + assert dupsock.type == conn.type + + +def safe_remove(file): + "Convenience function for removing temporary test files" + try: + os.remove(file) + except OSError as err: + if err.errno != errno.ENOENT: + # file is being used by another process + if WINDOWS and isinstance(err, WindowsError) and err.errno == 13: + return + raise + + +def safe_rmdir(dir): + "Convenience function for removing temporary test directories" + try: + os.rmdir(dir) + except OSError as err: + if err.errno != errno.ENOENT: + raise + + +def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): + """Keep calling function for timeout secs and exit if eval() + expression is True. + """ + stop_at = time.time() + timeout + while time.time() < stop_at: + ret = fun() + if eval(expr): + return ret + time.sleep(0.001) + raise RuntimeError('timed out (ret=%r)' % ret) + + +def retry_before_failing(ntimes=None): + """Decorator which runs a test function and retries N times before + actually failing. + """ + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + for x in range(ntimes or NO_RETRIES): + try: + return fun(*args, **kwargs) + except AssertionError: + pass + raise + return wrapper + return decorator + + +def skip_on_access_denied(only_if=None): + """Decorator to Ignore AccessDenied exceptions.""" + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + try: + return fun(*args, **kwargs) + except psutil.AccessDenied: + if only_if is not None: + if not only_if: + raise + msg = "%r was skipped because it raised AccessDenied" \ + % fun.__name__ + raise unittest.SkipTest(msg) + return wrapper + return decorator + + +def skip_on_not_implemented(only_if=None): + """Decorator to Ignore NotImplementedError exceptions.""" + def decorator(fun): + @functools.wraps(fun) + def wrapper(*args, **kwargs): + try: + return fun(*args, **kwargs) + except NotImplementedError: + if only_if is not None: + if not only_if: + raise + msg = "%r was skipped because it raised NotImplementedError" \ + % fun.__name__ + raise unittest.SkipTest(msg) + return wrapper + return decorator + + +def supports_ipv6(): + """Return True if IPv6 is supported on this platform.""" + if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): + return False + sock = None + try: + sock = socket.socket(AF_INET6, SOCK_STREAM) + sock.bind(("::1", 0)) + except (socket.error, socket.gaierror): + return False + else: + return True + finally: + if sock is not None: + sock.close() + + +if WINDOWS: + def get_winver(): + wv = sys.getwindowsversion() + if hasattr(wv, 'service_pack_major'): # python >= 2.7 + sp = wv.service_pack_major or 0 + else: + r = re.search("\s\d$", wv[4]) + if r: + sp = int(r.group(0)) + else: + sp = 0 + return (wv[0], wv[1], sp) + + +class ThreadTask(threading.Thread): + """A thread object used for running process thread tests.""" + + def __init__(self): + threading.Thread.__init__(self) + self._running = False + self._interval = None + self._flag = threading.Event() + + def __repr__(self): + name = self.__class__.__name__ + return '<%s running=%s at %#x>' % (name, self._running, id(self)) + + def start(self, interval=0.001): + """Start thread and keep it running until an explicit + stop() request. Polls for shutdown every 'timeout' seconds. + """ + if self._running: + raise ValueError("already started") + self._interval = interval + threading.Thread.start(self) + self._flag.wait() + + def run(self): + self._running = True + self._flag.set() + while self._running: + time.sleep(self._interval) + + def stop(self): + """Stop thread execution and and waits until it is stopped.""" + if not self._running: + raise ValueError("already stopped") + self._running = False + self.join() + + +# =================================================================== +# --- System-related API tests +# =================================================================== + +class TestSystemAPIs(unittest.TestCase): + """Tests for system-related APIs.""" + + def setUp(self): + safe_remove(TESTFN) + + def tearDown(self): + reap_children() + + def test_process_iter(self): + self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) + sproc = get_test_subprocess() + self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) + p = psutil.Process(sproc.pid) + p.kill() + p.wait() + self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) + + def test_wait_procs(self): + def callback(p): + l.append(p.pid) + + l = [] + sproc1 = get_test_subprocess() + sproc2 = get_test_subprocess() + sproc3 = get_test_subprocess() + procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] + self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) + self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) + t = time.time() + gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) + + self.assertLess(time.time() - t, 0.5) + self.assertEqual(gone, []) + self.assertEqual(len(alive), 3) + self.assertEqual(l, []) + for p in alive: + self.assertFalse(hasattr(p, 'returncode')) + + @retry_before_failing(30) + def test(procs, callback): + gone, alive = psutil.wait_procs(procs, timeout=0.03, + callback=callback) + self.assertEqual(len(gone), 1) + self.assertEqual(len(alive), 2) + return gone, alive + + sproc3.terminate() + gone, alive = test(procs, callback) + self.assertIn(sproc3.pid, [x.pid for x in gone]) + if POSIX: + self.assertEqual(gone.pop().returncode, signal.SIGTERM) + else: + self.assertEqual(gone.pop().returncode, 1) + self.assertEqual(l, [sproc3.pid]) + for p in alive: + self.assertFalse(hasattr(p, 'returncode')) + + @retry_before_failing(30) + def test(procs, callback): + gone, alive = psutil.wait_procs(procs, timeout=0.03, + callback=callback) + self.assertEqual(len(gone), 3) + self.assertEqual(len(alive), 0) + return gone, alive + + sproc1.terminate() + sproc2.terminate() + gone, alive = test(procs, callback) + self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) + for p in gone: + self.assertTrue(hasattr(p, 'returncode')) + + def test_wait_procs_no_timeout(self): + sproc1 = get_test_subprocess() + sproc2 = get_test_subprocess() + sproc3 = get_test_subprocess() + procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] + for p in procs: + p.terminate() + gone, alive = psutil.wait_procs(procs) + + def test_boot_time(self): + bt = psutil.boot_time() + self.assertIsInstance(bt, float) + self.assertGreater(bt, 0) + self.assertLess(bt, time.time()) + + @unittest.skipUnless(POSIX, 'posix only') + def test_PAGESIZE(self): + # pagesize is used internally to perform different calculations + # and it's determined by using SC_PAGE_SIZE; make sure + # getpagesize() returns the same value. + import resource + self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) + + def test_virtual_memory(self): + mem = psutil.virtual_memory() + assert mem.total > 0, mem + assert mem.available > 0, mem + assert 0 <= mem.percent <= 100, mem + assert mem.used > 0, mem + assert mem.free >= 0, mem + for name in mem._fields: + value = getattr(mem, name) + if name != 'percent': + self.assertIsInstance(value, (int, long)) + if name != 'total': + if not value >= 0: + self.fail("%r < 0 (%s)" % (name, value)) + if value > mem.total: + self.fail("%r > total (total=%s, %s=%s)" + % (name, mem.total, name, value)) + + def test_swap_memory(self): + mem = psutil.swap_memory() + assert mem.total >= 0, mem + assert mem.used >= 0, mem + if mem.total > 0: + # likely a system with no swap partition + assert mem.free > 0, mem + else: + assert mem.free == 0, mem + assert 0 <= mem.percent <= 100, mem + assert mem.sin >= 0, mem + assert mem.sout >= 0, mem + + def test_pid_exists(self): + sproc = get_test_subprocess(wait=True) + self.assertTrue(psutil.pid_exists(sproc.pid)) + p = psutil.Process(sproc.pid) + p.kill() + p.wait() + self.assertFalse(psutil.pid_exists(sproc.pid)) + self.assertFalse(psutil.pid_exists(-1)) + self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) + # pid 0 + psutil.pid_exists(0) == 0 in psutil.pids() + + def test_pid_exists_2(self): + reap_children() + pids = psutil.pids() + for pid in pids: + try: + assert psutil.pid_exists(pid) + except AssertionError: + # in case the process disappeared in meantime fail only + # if it is no longer in psutil.pids() + time.sleep(.1) + if pid in psutil.pids(): + self.fail(pid) + pids = range(max(pids) + 5000, max(pids) + 6000) + for pid in pids: + self.assertFalse(psutil.pid_exists(pid), msg=pid) + + def test_pids(self): + plist = [x.pid for x in psutil.process_iter()] + pidlist = psutil.pids() + self.assertEqual(plist.sort(), pidlist.sort()) + # make sure every pid is unique + self.assertEqual(len(pidlist), len(set(pidlist))) + + def test_test(self): + # test for psutil.test() function + stdout = sys.stdout + sys.stdout = DEVNULL + try: + psutil.test() + finally: + sys.stdout = stdout + + def test_cpu_count(self): + logical = psutil.cpu_count() + self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) + self.assertGreaterEqual(logical, 1) + # + if LINUX: + with open("/proc/cpuinfo") as fd: + cpuinfo_data = fd.read() + if "physical id" not in cpuinfo_data: + raise unittest.SkipTest("cpuinfo doesn't include physical id") + physical = psutil.cpu_count(logical=False) + self.assertGreaterEqual(physical, 1) + self.assertGreaterEqual(logical, physical) + + def test_sys_cpu_times(self): + total = 0 + times = psutil.cpu_times() + sum(times) + for cp_time in times: + self.assertIsInstance(cp_time, float) + self.assertGreaterEqual(cp_time, 0.0) + total += cp_time + self.assertEqual(total, sum(times)) + str(times) + if not WINDOWS: + # CPU times are always supposed to increase over time or + # remain the same but never go backwards, see: + # https://github.com/giampaolo/psutil/issues/392 + last = psutil.cpu_times() + for x in range(100): + new = psutil.cpu_times() + for field in new._fields: + new_t = getattr(new, field) + last_t = getattr(last, field) + self.assertGreaterEqual(new_t, last_t, + msg="%s %s" % (new_t, last_t)) + last = new + + def test_sys_cpu_times2(self): + t1 = sum(psutil.cpu_times()) + time.sleep(0.1) + t2 = sum(psutil.cpu_times()) + difference = t2 - t1 + if not difference >= 0.05: + self.fail("difference %s" % difference) + + def test_sys_per_cpu_times(self): + for times in psutil.cpu_times(percpu=True): + total = 0 + sum(times) + for cp_time in times: + self.assertIsInstance(cp_time, float) + self.assertGreaterEqual(cp_time, 0.0) + total += cp_time + self.assertEqual(total, sum(times)) + str(times) + self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), + len(psutil.cpu_times(percpu=False))) + + # Note: in theory CPU times are always supposed to increase over + # time or remain the same but never go backwards. In practice + # sometimes this is not the case. + # This issue seemd to be afflict Windows: + # https://github.com/giampaolo/psutil/issues/392 + # ...but it turns out also Linux (rarely) behaves the same. + # last = psutil.cpu_times(percpu=True) + # for x in range(100): + # new = psutil.cpu_times(percpu=True) + # for index in range(len(new)): + # newcpu = new[index] + # lastcpu = last[index] + # for field in newcpu._fields: + # new_t = getattr(newcpu, field) + # last_t = getattr(lastcpu, field) + # self.assertGreaterEqual( + # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) + # last = new + + def test_sys_per_cpu_times_2(self): + tot1 = psutil.cpu_times(percpu=True) + stop_at = time.time() + 0.1 + while True: + if time.time() >= stop_at: + break + tot2 = psutil.cpu_times(percpu=True) + for t1, t2 in zip(tot1, tot2): + t1, t2 = sum(t1), sum(t2) + difference = t2 - t1 + if difference >= 0.05: + return + self.fail() + + def _test_cpu_percent(self, percent, last_ret, new_ret): + try: + self.assertIsInstance(percent, float) + self.assertGreaterEqual(percent, 0.0) + self.assertIsNot(percent, -0.0) + self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) + except AssertionError as err: + raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( + err, pprint.pformat(last_ret), pprint.pformat(new_ret))) + + def test_sys_cpu_percent(self): + last = psutil.cpu_percent(interval=0.001) + for x in range(100): + new = psutil.cpu_percent(interval=None) + self._test_cpu_percent(new, last, new) + last = new + + def test_sys_per_cpu_percent(self): + last = psutil.cpu_percent(interval=0.001, percpu=True) + self.assertEqual(len(last), psutil.cpu_count()) + for x in range(100): + new = psutil.cpu_percent(interval=None, percpu=True) + for percent in new: + self._test_cpu_percent(percent, last, new) + last = new + + def test_sys_cpu_times_percent(self): + last = psutil.cpu_times_percent(interval=0.001) + for x in range(100): + new = psutil.cpu_times_percent(interval=None) + for percent in new: + self._test_cpu_percent(percent, last, new) + self._test_cpu_percent(sum(new), last, new) + last = new + + def test_sys_per_cpu_times_percent(self): + last = psutil.cpu_times_percent(interval=0.001, percpu=True) + self.assertEqual(len(last), psutil.cpu_count()) + for x in range(100): + new = psutil.cpu_times_percent(interval=None, percpu=True) + for cpu in new: + for percent in cpu: + self._test_cpu_percent(percent, last, new) + self._test_cpu_percent(sum(cpu), last, new) + last = new + + def test_sys_per_cpu_times_percent_negative(self): + # see: https://github.com/giampaolo/psutil/issues/645 + psutil.cpu_times_percent(percpu=True) + zero_times = [x._make([0 for x in range(len(x._fields))]) + for x in psutil.cpu_times(percpu=True)] + with mock.patch('psutil.cpu_times', return_value=zero_times): + for cpu in psutil.cpu_times_percent(percpu=True): + for percent in cpu: + self._test_cpu_percent(percent, None, None) + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + def test_disk_usage(self): + usage = psutil.disk_usage(os.getcwd()) + assert usage.total > 0, usage + assert usage.used > 0, usage + assert usage.free > 0, usage + assert usage.total > usage.used, usage + assert usage.total > usage.free, usage + assert 0 <= usage.percent <= 100, usage.percent + if hasattr(shutil, 'disk_usage'): + # py >= 3.3, see: http://bugs.python.org/issue12442 + shutil_usage = shutil.disk_usage(os.getcwd()) + tolerance = 5 * 1024 * 1024 # 5MB + self.assertEqual(usage.total, shutil_usage.total) + self.assertAlmostEqual(usage.free, shutil_usage.free, + delta=tolerance) + self.assertAlmostEqual(usage.used, shutil_usage.used, + delta=tolerance) + + # if path does not exist OSError ENOENT is expected across + # all platforms + fname = tempfile.mktemp() + try: + psutil.disk_usage(fname) + except OSError as err: + if err.args[0] != errno.ENOENT: + raise + else: + self.fail("OSError not raised") + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + def test_disk_usage_unicode(self): + # see: https://github.com/giampaolo/psutil/issues/416 + # XXX this test is not really reliable as it always fails on + # Python 3.X (2.X is fine) + try: + safe_rmdir(TESTFN_UNICODE) + os.mkdir(TESTFN_UNICODE) + psutil.disk_usage(TESTFN_UNICODE) + safe_rmdir(TESTFN_UNICODE) + except UnicodeEncodeError: + pass + + @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), + "os.statvfs() function not available on this platform") + @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") + def test_disk_partitions(self): + # all = False + ls = psutil.disk_partitions(all=False) + # on travis we get: + # self.assertEqual(p.cpu_affinity(), [n]) + # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] + self.assertTrue(ls, msg=ls) + for disk in ls: + if WINDOWS and 'cdrom' in disk.opts: + continue + if not POSIX: + assert os.path.exists(disk.device), disk + else: + # we cannot make any assumption about this, see: + # http://goo.gl/p9c43 + disk.device + if SUNOS: + # on solaris apparently mount points can also be files + assert os.path.exists(disk.mountpoint), disk + else: + assert os.path.isdir(disk.mountpoint), disk + assert disk.fstype, disk + self.assertIsInstance(disk.opts, str) + + # all = True + ls = psutil.disk_partitions(all=True) + self.assertTrue(ls, msg=ls) + for disk in psutil.disk_partitions(all=True): + if not WINDOWS: + try: + os.stat(disk.mountpoint) + except OSError as err: + # http://mail.python.org/pipermail/python-dev/ + # 2012-June/120787.html + if err.errno not in (errno.EPERM, errno.EACCES): + raise + else: + if SUNOS: + # on solaris apparently mount points can also be files + assert os.path.exists(disk.mountpoint), disk + else: + assert os.path.isdir(disk.mountpoint), disk + self.assertIsInstance(disk.fstype, str) + self.assertIsInstance(disk.opts, str) + + def find_mount_point(path): + path = os.path.abspath(path) + while not os.path.ismount(path): + path = os.path.dirname(path) + return path + + mount = find_mount_point(__file__) + mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] + self.assertIn(mount, mounts) + psutil.disk_usage(mount) + + @skip_on_access_denied() + def test_net_connections(self): + def check(cons, families, types_): + for conn in cons: + self.assertIn(conn.family, families, msg=conn) + if conn.family != getattr(socket, 'AF_UNIX', object()): + self.assertIn(conn.type, types_, msg=conn) + + from psutil._common import conn_tmap + for kind, groups in conn_tmap.items(): + if SUNOS and kind == 'unix': + continue + families, types_ = groups + cons = psutil.net_connections(kind) + self.assertEqual(len(cons), len(set(cons))) + check(cons, families, types_) + + def test_net_io_counters(self): + def check_ntuple(nt): + self.assertEqual(nt[0], nt.bytes_sent) + self.assertEqual(nt[1], nt.bytes_recv) + self.assertEqual(nt[2], nt.packets_sent) + self.assertEqual(nt[3], nt.packets_recv) + self.assertEqual(nt[4], nt.errin) + self.assertEqual(nt[5], nt.errout) + self.assertEqual(nt[6], nt.dropin) + self.assertEqual(nt[7], nt.dropout) + assert nt.bytes_sent >= 0, nt + assert nt.bytes_recv >= 0, nt + assert nt.packets_sent >= 0, nt + assert nt.packets_recv >= 0, nt + assert nt.errin >= 0, nt + assert nt.errout >= 0, nt + assert nt.dropin >= 0, nt + assert nt.dropout >= 0, nt + + ret = psutil.net_io_counters(pernic=False) + check_ntuple(ret) + ret = psutil.net_io_counters(pernic=True) + self.assertNotEqual(ret, []) + for key in ret: + self.assertTrue(key) + check_ntuple(ret[key]) + + def test_net_if_addrs(self): + nics = psutil.net_if_addrs() + assert nics, nics + + # Not reliable on all platforms (net_if_addrs() reports more + # interfaces). + # self.assertEqual(sorted(nics.keys()), + # sorted(psutil.net_io_counters(pernic=True).keys())) + + families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) + for nic, addrs in nics.items(): + self.assertEqual(len(set(addrs)), len(addrs)) + for addr in addrs: + self.assertIsInstance(addr.family, int) + self.assertIsInstance(addr.address, str) + self.assertIsInstance(addr.netmask, (str, type(None))) + self.assertIsInstance(addr.broadcast, (str, type(None))) + self.assertIn(addr.family, families) + if sys.version_info >= (3, 4): + self.assertIsInstance(addr.family, enum.IntEnum) + if addr.family == socket.AF_INET: + s = socket.socket(addr.family) + with contextlib.closing(s): + s.bind((addr.address, 0)) + elif addr.family == socket.AF_INET6: + info = socket.getaddrinfo( + addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, + 0, socket.AI_PASSIVE)[0] + af, socktype, proto, canonname, sa = info + s = socket.socket(af, socktype, proto) + with contextlib.closing(s): + s.bind(sa) + for ip in (addr.address, addr.netmask, addr.broadcast): + if ip is not None: + # TODO: skip AF_INET6 for now because I get: + # AddressValueError: Only hex digits permitted in + # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' + if addr.family != AF_INET6: + check_ip_address(ip, addr.family) + + if BSD or OSX or SUNOS: + if hasattr(socket, "AF_LINK"): + self.assertEqual(psutil.AF_LINK, socket.AF_LINK) + elif LINUX: + self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) + elif WINDOWS: + self.assertEqual(psutil.AF_LINK, -1) + + @unittest.skipIf(TRAVIS, "EPERM on travis") + def test_net_if_stats(self): + nics = psutil.net_if_stats() + assert nics, nics + all_duplexes = (psutil.NIC_DUPLEX_FULL, + psutil.NIC_DUPLEX_HALF, + psutil.NIC_DUPLEX_UNKNOWN) + for nic, stats in nics.items(): + isup, duplex, speed, mtu = stats + self.assertIsInstance(isup, bool) + self.assertIn(duplex, all_duplexes) + self.assertIn(duplex, all_duplexes) + self.assertGreaterEqual(speed, 0) + self.assertGreaterEqual(mtu, 0) + + @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), + '/proc/diskstats not available on this linux version') + @unittest.skipIf(APPVEYOR, + "can't find any physical disk on Appveyor") + def test_disk_io_counters(self): + def check_ntuple(nt): + self.assertEqual(nt[0], nt.read_count) + self.assertEqual(nt[1], nt.write_count) + self.assertEqual(nt[2], nt.read_bytes) + self.assertEqual(nt[3], nt.write_bytes) + self.assertEqual(nt[4], nt.read_time) + self.assertEqual(nt[5], nt.write_time) + assert nt.read_count >= 0, nt + assert nt.write_count >= 0, nt + assert nt.read_bytes >= 0, nt + assert nt.write_bytes >= 0, nt + assert nt.read_time >= 0, nt + assert nt.write_time >= 0, nt + + ret = psutil.disk_io_counters(perdisk=False) + check_ntuple(ret) + ret = psutil.disk_io_counters(perdisk=True) + # make sure there are no duplicates + self.assertEqual(len(ret), len(set(ret))) + for key in ret: + assert key, key + check_ntuple(ret[key]) + if LINUX and key[-1].isdigit(): + # if 'sda1' is listed 'sda' shouldn't, see: + # https://github.com/giampaolo/psutil/issues/338 + while key[-1].isdigit(): + key = key[:-1] + self.assertNotIn(key, ret.keys()) + + def test_users(self): + users = psutil.users() + if not APPVEYOR: + self.assertNotEqual(users, []) + for user in users: + assert user.name, user + user.terminal + user.host + assert user.started > 0.0, user + datetime.datetime.fromtimestamp(user.started) + + +# =================================================================== +# --- psutil.Process class tests +# =================================================================== + +class TestProcess(unittest.TestCase): + """Tests for psutil.Process class.""" + + def setUp(self): + safe_remove(TESTFN) + + def tearDown(self): + reap_children() + + def test_pid(self): + self.assertEqual(psutil.Process().pid, os.getpid()) + sproc = get_test_subprocess() + self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) + + def test_kill(self): + sproc = get_test_subprocess(wait=True) + test_pid = sproc.pid + p = psutil.Process(test_pid) + p.kill() + sig = p.wait() + self.assertFalse(psutil.pid_exists(test_pid)) + if POSIX: + self.assertEqual(sig, signal.SIGKILL) + + def test_terminate(self): + sproc = get_test_subprocess(wait=True) + test_pid = sproc.pid + p = psutil.Process(test_pid) + p.terminate() + sig = p.wait() + self.assertFalse(psutil.pid_exists(test_pid)) + if POSIX: + self.assertEqual(sig, signal.SIGTERM) + + def test_send_signal(self): + sig = signal.SIGKILL if POSIX else signal.SIGTERM + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.send_signal(sig) + exit_sig = p.wait() + self.assertFalse(psutil.pid_exists(p.pid)) + if POSIX: + self.assertEqual(exit_sig, sig) + # + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.send_signal(sig) + with mock.patch('psutil.os.kill', + side_effect=OSError(errno.ESRCH, "")) as fun: + with self.assertRaises(psutil.NoSuchProcess): + p.send_signal(sig) + assert fun.called + # + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.send_signal(sig) + with mock.patch('psutil.os.kill', + side_effect=OSError(errno.EPERM, "")) as fun: + with self.assertRaises(psutil.AccessDenied): + p.send_signal(sig) + assert fun.called + + def test_wait(self): + # check exit code signal + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.kill() + code = p.wait() + if POSIX: + self.assertEqual(code, signal.SIGKILL) + else: + self.assertEqual(code, 0) + self.assertFalse(p.is_running()) + + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.terminate() + code = p.wait() + if POSIX: + self.assertEqual(code, signal.SIGTERM) + else: + self.assertEqual(code, 0) + self.assertFalse(p.is_running()) + + # check sys.exit() code + code = "import time, sys; time.sleep(0.01); sys.exit(5);" + sproc = get_test_subprocess([PYTHON, "-c", code]) + p = psutil.Process(sproc.pid) + self.assertEqual(p.wait(), 5) + self.assertFalse(p.is_running()) + + # Test wait() issued twice. + # It is not supposed to raise NSP when the process is gone. + # On UNIX this should return None, on Windows it should keep + # returning the exit code. + sproc = get_test_subprocess([PYTHON, "-c", code]) + p = psutil.Process(sproc.pid) + self.assertEqual(p.wait(), 5) + self.assertIn(p.wait(), (5, None)) + + # test timeout + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.name() + self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) + + # timeout < 0 not allowed + self.assertRaises(ValueError, p.wait, -1) + + # XXX why is this skipped on Windows? + @unittest.skipUnless(POSIX, 'skipped on Windows') + def test_wait_non_children(self): + # test wait() against processes which are not our children + code = "import sys;" + code += "from subprocess import Popen, PIPE;" + code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON + code += "sp = Popen(cmd, stdout=PIPE);" + code += "sys.stdout.write(str(sp.pid));" + sproc = get_test_subprocess([PYTHON, "-c", code], + stdout=subprocess.PIPE) + grandson_pid = int(sproc.stdout.read()) + grandson_proc = psutil.Process(grandson_pid) + try: + self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) + grandson_proc.kill() + ret = grandson_proc.wait() + self.assertEqual(ret, None) + finally: + if grandson_proc.is_running(): + grandson_proc.kill() + grandson_proc.wait() + + def test_wait_timeout_0(self): + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + self.assertRaises(psutil.TimeoutExpired, p.wait, 0) + p.kill() + stop_at = time.time() + 2 + while True: + try: + code = p.wait(0) + except psutil.TimeoutExpired: + if time.time() >= stop_at: + raise + else: + break + if POSIX: + self.assertEqual(code, signal.SIGKILL) + else: + self.assertEqual(code, 0) + self.assertFalse(p.is_running()) + + def test_cpu_percent(self): + p = psutil.Process() + p.cpu_percent(interval=0.001) + p.cpu_percent(interval=0.001) + for x in range(100): + percent = p.cpu_percent(interval=None) + self.assertIsInstance(percent, float) + self.assertGreaterEqual(percent, 0.0) + if not POSIX: + self.assertLessEqual(percent, 100.0) + else: + self.assertGreaterEqual(percent, 0.0) + + def test_cpu_times(self): + times = psutil.Process().cpu_times() + assert (times.user > 0.0) or (times.system > 0.0), times + # make sure returned values can be pretty printed with strftime + time.strftime("%H:%M:%S", time.localtime(times.user)) + time.strftime("%H:%M:%S", time.localtime(times.system)) + + # Test Process.cpu_times() against os.times() + # os.times() is broken on Python 2.6 + # http://bugs.python.org/issue1040026 + # XXX fails on OSX: not sure if it's for os.times(). We should + # try this with Python 2.7 and re-enable the test. + + @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, + 'os.times() is not reliable on this Python version') + def test_cpu_times2(self): + user_time, kernel_time = psutil.Process().cpu_times() + utime, ktime = os.times()[:2] + + # Use os.times()[:2] as base values to compare our results + # using a tolerance of +/- 0.1 seconds. + # It will fail if the difference between the values is > 0.1s. + if (max([user_time, utime]) - min([user_time, utime])) > 0.1: + self.fail("expected: %s, found: %s" % (utime, user_time)) + + if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: + self.fail("expected: %s, found: %s" % (ktime, kernel_time)) + + def test_create_time(self): + sproc = get_test_subprocess(wait=True) + now = time.time() + p = psutil.Process(sproc.pid) + create_time = p.create_time() + + # Use time.time() as base value to compare our result using a + # tolerance of +/- 1 second. + # It will fail if the difference between the values is > 2s. + difference = abs(create_time - now) + if difference > 2: + self.fail("expected: %s, found: %s, difference: %s" + % (now, create_time, difference)) + + # make sure returned value can be pretty printed with strftime + time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) + + @unittest.skipIf(WINDOWS, 'Windows only') + def test_terminal(self): + terminal = psutil.Process().terminal() + if sys.stdin.isatty(): + self.assertEqual(terminal, sh('tty')) + else: + assert terminal, repr(terminal) + + @unittest.skipUnless(LINUX or BSD or WINDOWS, + 'not available on this platform') + @skip_on_not_implemented(only_if=LINUX) + def test_io_counters(self): + p = psutil.Process() + # test reads + io1 = p.io_counters() + with open(PYTHON, 'rb') as f: + f.read() + io2 = p.io_counters() + if not BSD: + assert io2.read_count > io1.read_count, (io1, io2) + self.assertEqual(io2.write_count, io1.write_count) + assert io2.read_bytes >= io1.read_bytes, (io1, io2) + assert io2.write_bytes >= io1.write_bytes, (io1, io2) + # test writes + io1 = p.io_counters() + with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f: + if PY3: + f.write(bytes("x" * 1000000, 'ascii')) + else: + f.write("x" * 1000000) + io2 = p.io_counters() + assert io2.write_count >= io1.write_count, (io1, io2) + assert io2.write_bytes >= io1.write_bytes, (io1, io2) + assert io2.read_count >= io1.read_count, (io1, io2) + assert io2.read_bytes >= io1.read_bytes, (io1, io2) + + @unittest.skipUnless(LINUX or (WINDOWS and get_winver() >= WIN_VISTA), + 'Linux and Windows Vista only') + @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") + def test_ionice(self): + if LINUX: + from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, + IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) + self.assertEqual(IOPRIO_CLASS_NONE, 0) + self.assertEqual(IOPRIO_CLASS_RT, 1) + self.assertEqual(IOPRIO_CLASS_BE, 2) + self.assertEqual(IOPRIO_CLASS_IDLE, 3) + p = psutil.Process() + try: + p.ionice(2) + ioclass, value = p.ionice() + if enum is not None: + self.assertIsInstance(ioclass, enum.IntEnum) + self.assertEqual(ioclass, 2) + self.assertEqual(value, 4) + # + p.ionice(3) + ioclass, value = p.ionice() + self.assertEqual(ioclass, 3) + self.assertEqual(value, 0) + # + p.ionice(2, 0) + ioclass, value = p.ionice() + self.assertEqual(ioclass, 2) + self.assertEqual(value, 0) + p.ionice(2, 7) + ioclass, value = p.ionice() + self.assertEqual(ioclass, 2) + self.assertEqual(value, 7) + # + self.assertRaises(ValueError, p.ionice, 2, 10) + self.assertRaises(ValueError, p.ionice, 2, -1) + self.assertRaises(ValueError, p.ionice, 4) + self.assertRaises(TypeError, p.ionice, 2, "foo") + self.assertRaisesRegexp( + ValueError, "can't specify value with IOPRIO_CLASS_NONE", + p.ionice, psutil.IOPRIO_CLASS_NONE, 1) + self.assertRaisesRegexp( + ValueError, "can't specify value with IOPRIO_CLASS_IDLE", + p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) + self.assertRaisesRegexp( + ValueError, "'ioclass' argument must be specified", + p.ionice, value=1) + finally: + p.ionice(IOPRIO_CLASS_NONE) + else: + p = psutil.Process() + original = p.ionice() + self.assertIsInstance(original, int) + try: + value = 0 # very low + if original == value: + value = 1 # low + p.ionice(value) + self.assertEqual(p.ionice(), value) + finally: + p.ionice(original) + # + self.assertRaises(ValueError, p.ionice, 3) + self.assertRaises(TypeError, p.ionice, 2, 1) + + @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, + "only available on Linux >= 2.6.36") + def test_rlimit_get(self): + import resource + p = psutil.Process(os.getpid()) + names = [x for x in dir(psutil) if x.startswith('RLIMIT')] + assert names, names + for name in names: + value = getattr(psutil, name) + self.assertGreaterEqual(value, 0) + if name in dir(resource): + self.assertEqual(value, getattr(resource, name)) + self.assertEqual(p.rlimit(value), resource.getrlimit(value)) + else: + ret = p.rlimit(value) + self.assertEqual(len(ret), 2) + self.assertGreaterEqual(ret[0], -1) + self.assertGreaterEqual(ret[1], -1) + + @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, + "only available on Linux >= 2.6.36") + def test_rlimit_set(self): + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) + self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) + # If pid is 0 prlimit() applies to the calling process and + # we don't want that. + with self.assertRaises(ValueError): + psutil._psplatform.Process(0).rlimit(0) + with self.assertRaises(ValueError): + p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) + + def test_num_threads(self): + # on certain platforms such as Linux we might test for exact + # thread number, since we always have with 1 thread per process, + # but this does not apply across all platforms (OSX, Windows) + p = psutil.Process() + step1 = p.num_threads() + + thread = ThreadTask() + thread.start() + try: + step2 = p.num_threads() + self.assertEqual(step2, step1 + 1) + thread.stop() + finally: + if thread._running: + thread.stop() + + @unittest.skipUnless(WINDOWS, 'Windows only') + def test_num_handles(self): + # a better test is done later into test/_windows.py + p = psutil.Process() + self.assertGreater(p.num_handles(), 0) + + def test_threads(self): + p = psutil.Process() + step1 = p.threads() + + thread = ThreadTask() + thread.start() + + try: + step2 = p.threads() + self.assertEqual(len(step2), len(step1) + 1) + # on Linux, first thread id is supposed to be this process + if LINUX: + self.assertEqual(step2[0].id, os.getpid()) + athread = step2[0] + # test named tuple + self.assertEqual(athread.id, athread[0]) + self.assertEqual(athread.user_time, athread[1]) + self.assertEqual(athread.system_time, athread[2]) + # test num threads + thread.stop() + finally: + if thread._running: + thread.stop() + + def test_memory_info(self): + p = psutil.Process() + + # step 1 - get a base value to compare our results + rss1, vms1 = p.memory_info() + percent1 = p.memory_percent() + self.assertGreater(rss1, 0) + self.assertGreater(vms1, 0) + + # step 2 - allocate some memory + memarr = [None] * 1500000 + + rss2, vms2 = p.memory_info() + percent2 = p.memory_percent() + # make sure that the memory usage bumped up + self.assertGreater(rss2, rss1) + self.assertGreaterEqual(vms2, vms1) # vms might be equal + self.assertGreater(percent2, percent1) + del memarr + + # def test_memory_info_ex(self): + # # tested later in fetch all test suite + + def test_memory_maps(self): + p = psutil.Process() + maps = p.memory_maps() + paths = [x for x in maps] + self.assertEqual(len(paths), len(set(paths))) + ext_maps = p.memory_maps(grouped=False) + + for nt in maps: + if not nt.path.startswith('['): + assert os.path.isabs(nt.path), nt.path + if POSIX: + assert os.path.exists(nt.path), nt.path + else: + # XXX - On Windows we have this strange behavior with + # 64 bit dlls: they are visible via explorer but cannot + # be accessed via os.stat() (wtf?). + if '64' not in os.path.basename(nt.path): + assert os.path.exists(nt.path), nt.path + for nt in ext_maps: + for fname in nt._fields: + value = getattr(nt, fname) + if fname == 'path': + continue + elif fname in ('addr', 'perms'): + assert value, value + else: + self.assertIsInstance(value, (int, long)) + assert value >= 0, value + + def test_memory_percent(self): + p = psutil.Process() + self.assertGreater(p.memory_percent(), 0.0) + + def test_is_running(self): + sproc = get_test_subprocess(wait=True) + p = psutil.Process(sproc.pid) + assert p.is_running() + assert p.is_running() + p.kill() + p.wait() + assert not p.is_running() + assert not p.is_running() + + def test_exe(self): + sproc = get_test_subprocess(wait=True) + exe = psutil.Process(sproc.pid).exe() + try: + self.assertEqual(exe, PYTHON) + except AssertionError: + if WINDOWS and len(exe) == len(PYTHON): + # on Windows we don't care about case sensitivity + self.assertEqual(exe.lower(), PYTHON.lower()) + else: + # certain platforms such as BSD are more accurate returning: + # "/usr/local/bin/python2.7" + # ...instead of: + # "/usr/local/bin/python" + # We do not want to consider this difference in accuracy + # an error. + ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) + self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) + + def test_cmdline(self): + cmdline = [PYTHON, "-c", "import time; time.sleep(60)"] + sproc = get_test_subprocess(cmdline, wait=True) + self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), + ' '.join(cmdline)) + + def test_name(self): + sproc = get_test_subprocess(PYTHON, wait=True) + name = psutil.Process(sproc.pid).name().lower() + pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() + assert pyexe.startswith(name), (pyexe, name) + + @unittest.skipUnless(POSIX, "posix only") + # TODO: add support for other compilers + @unittest.skipUnless(which("gcc"), "gcc not available") + def test_prog_w_funky_name(self): + # Test that name(), exe() and cmdline() correctly handle programs + # with funky chars such as spaces and ")", see: + # https://github.com/giampaolo/psutil/issues/628 + funky_name = "/tmp/foo bar )" + _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp") + self.addCleanup(lambda: safe_remove(c_file)) + self.addCleanup(lambda: safe_remove(funky_name)) + with open(c_file, "w") as f: + f.write("void main() { pause(); }") + subprocess.check_call(["gcc", c_file, "-o", funky_name]) + sproc = get_test_subprocess( + [funky_name, "arg1", "arg2", "", "arg3", ""]) + p = psutil.Process(sproc.pid) + # ...in order to try to prevent occasional failures on travis + wait_for_pid(p.pid) + self.assertEqual(p.name(), "foo bar )") + self.assertEqual(p.exe(), "/tmp/foo bar )") + self.assertEqual( + p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""]) + + @unittest.skipUnless(POSIX, 'posix only') + def test_uids(self): + p = psutil.Process() + real, effective, saved = p.uids() + # os.getuid() refers to "real" uid + self.assertEqual(real, os.getuid()) + # os.geteuid() refers to "effective" uid + self.assertEqual(effective, os.geteuid()) + # no such thing as os.getsuid() ("saved" uid), but starting + # from python 2.7 we have os.getresuid()[2] + if hasattr(os, "getresuid"): + self.assertEqual(saved, os.getresuid()[2]) + + @unittest.skipUnless(POSIX, 'posix only') + def test_gids(self): + p = psutil.Process() + real, effective, saved = p.gids() + # os.getuid() refers to "real" uid + self.assertEqual(real, os.getgid()) + # os.geteuid() refers to "effective" uid + self.assertEqual(effective, os.getegid()) + # no such thing as os.getsuid() ("saved" uid), but starting + # from python 2.7 we have os.getresgid()[2] + if hasattr(os, "getresuid"): + self.assertEqual(saved, os.getresgid()[2]) + + def test_nice(self): + p = psutil.Process() + self.assertRaises(TypeError, p.nice, "str") + if WINDOWS: + try: + init = p.nice() + if sys.version_info > (3, 4): + self.assertIsInstance(init, enum.IntEnum) + else: + self.assertIsInstance(init, int) + self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) + p.nice(psutil.HIGH_PRIORITY_CLASS) + self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) + p.nice(psutil.NORMAL_PRIORITY_CLASS) + self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) + finally: + p.nice(psutil.NORMAL_PRIORITY_CLASS) + else: + try: + first_nice = p.nice() + p.nice(1) + self.assertEqual(p.nice(), 1) + # going back to previous nice value raises + # AccessDenied on OSX + if not OSX: + p.nice(0) + self.assertEqual(p.nice(), 0) + except psutil.AccessDenied: + pass + finally: + try: + p.nice(first_nice) + except psutil.AccessDenied: + pass + + def test_status(self): + p = psutil.Process() + self.assertEqual(p.status(), psutil.STATUS_RUNNING) + + def test_username(self): + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + if POSIX: + import pwd + self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) + with mock.patch("psutil.pwd.getpwuid", + side_effect=KeyError) as fun: + p.username() == str(p.uids().real) + assert fun.called + + elif WINDOWS and 'USERNAME' in os.environ: + expected_username = os.environ['USERNAME'] + expected_domain = os.environ['USERDOMAIN'] + domain, username = p.username().split('\\') + self.assertEqual(domain, expected_domain) + self.assertEqual(username, expected_username) + else: + p.username() + + def test_cwd(self): + sproc = get_test_subprocess(wait=True) + p = psutil.Process(sproc.pid) + self.assertEqual(p.cwd(), os.getcwd()) + + def test_cwd_2(self): + cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] + sproc = get_test_subprocess(cmd, wait=True) + p = psutil.Process(sproc.pid) + call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") + + @unittest.skipUnless(WINDOWS or LINUX or BSD, + 'not available on this platform') + @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") + def test_cpu_affinity(self): + p = psutil.Process() + initial = p.cpu_affinity() + if hasattr(os, "sched_getaffinity"): + self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) + self.assertEqual(len(initial), len(set(initial))) + all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) + # setting on travis doesn't seem to work (always return all + # CPUs on get): + # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0] + for n in all_cpus: + p.cpu_affinity([n]) + self.assertEqual(p.cpu_affinity(), [n]) + if hasattr(os, "sched_getaffinity"): + self.assertEqual(p.cpu_affinity(), + list(os.sched_getaffinity(p.pid))) + # + p.cpu_affinity(all_cpus) + self.assertEqual(p.cpu_affinity(), all_cpus) + if hasattr(os, "sched_getaffinity"): + self.assertEqual(p.cpu_affinity(), + list(os.sched_getaffinity(p.pid))) + # + self.assertRaises(TypeError, p.cpu_affinity, 1) + p.cpu_affinity(initial) + # it should work with all iterables, not only lists + p.cpu_affinity(set(all_cpus)) + p.cpu_affinity(tuple(all_cpus)) + invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] + self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) + self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) + self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) + + # TODO + @unittest.skipIf(BSD, "broken on BSD, see #595") + @unittest.skipIf(APPVEYOR, + "can't find any process file on Appveyor") + def test_open_files(self): + # current process + p = psutil.Process() + files = p.open_files() + self.assertFalse(TESTFN in files) + with open(TESTFN, 'w'): + # give the kernel some time to see the new file + call_until(p.open_files, "len(ret) != %i" % len(files)) + filenames = [x.path for x in p.open_files()] + self.assertIn(TESTFN, filenames) + for file in filenames: + assert os.path.isfile(file), file + + # another process + cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN + sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) + p = psutil.Process(sproc.pid) + + for x in range(100): + filenames = [x.path for x in p.open_files()] + if TESTFN in filenames: + break + time.sleep(.01) + else: + self.assertIn(TESTFN, filenames) + for file in filenames: + assert os.path.isfile(file), file + + # TODO + @unittest.skipIf(BSD, "broken on BSD, see #595") + @unittest.skipIf(APPVEYOR, + "can't find any process file on Appveyor") + def test_open_files2(self): + # test fd and path fields + with open(TESTFN, 'w') as fileobj: + p = psutil.Process() + for path, fd in p.open_files(): + if path == fileobj.name or fd == fileobj.fileno(): + break + else: + self.fail("no file found; files=%s" % repr(p.open_files())) + self.assertEqual(path, fileobj.name) + if WINDOWS: + self.assertEqual(fd, -1) + else: + self.assertEqual(fd, fileobj.fileno()) + # test positions + ntuple = p.open_files()[0] + self.assertEqual(ntuple[0], ntuple.path) + self.assertEqual(ntuple[1], ntuple.fd) + # test file is gone + self.assertTrue(fileobj.name not in p.open_files()) + + def compare_proc_sys_cons(self, pid, proc_cons): + from psutil._common import pconn + sys_cons = [] + for c in psutil.net_connections(kind='all'): + if c.pid == pid: + sys_cons.append(pconn(*c[:-1])) + if BSD: + # on BSD all fds are set to -1 + proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons] + self.assertEqual(sorted(proc_cons), sorted(sys_cons)) + + @skip_on_access_denied(only_if=OSX) + def test_connections(self): + def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): + all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", + "tcp6", "udp", "udp4", "udp6") + check_connection_ntuple(conn) + self.assertEqual(conn.family, family) + self.assertEqual(conn.type, type) + self.assertEqual(conn.laddr, laddr) + self.assertEqual(conn.raddr, raddr) + self.assertEqual(conn.status, status) + for kind in all_kinds: + cons = proc.connections(kind=kind) + if kind in kinds: + self.assertNotEqual(cons, []) + else: + self.assertEqual(cons, []) + # compare against system-wide connections + # XXX Solaris can't retrieve system-wide UNIX + # sockets. + if not SUNOS: + self.compare_proc_sys_cons(proc.pid, [conn]) + + tcp_template = textwrap.dedent(""" + import socket, time + s = socket.socket($family, socket.SOCK_STREAM) + s.bind(('$addr', 0)) + s.listen(1) + with open('$testfn', 'w') as f: + f.write(str(s.getsockname()[:2])) + time.sleep(60) + """) + + udp_template = textwrap.dedent(""" + import socket, time + s = socket.socket($family, socket.SOCK_DGRAM) + s.bind(('$addr', 0)) + with open('$testfn', 'w') as f: + f.write(str(s.getsockname()[:2])) + time.sleep(60) + """) + + from string import Template + testfile = os.path.basename(TESTFN) + tcp4_template = Template(tcp_template).substitute( + family=int(AF_INET), addr="127.0.0.1", testfn=testfile) + udp4_template = Template(udp_template).substitute( + family=int(AF_INET), addr="127.0.0.1", testfn=testfile) + tcp6_template = Template(tcp_template).substitute( + family=int(AF_INET6), addr="::1", testfn=testfile) + udp6_template = Template(udp_template).substitute( + family=int(AF_INET6), addr="::1", testfn=testfile) + + # launch various subprocess instantiating a socket of various + # families and types to enrich psutil results + tcp4_proc = pyrun(tcp4_template) + tcp4_addr = eval(wait_for_file(testfile)) + udp4_proc = pyrun(udp4_template) + udp4_addr = eval(wait_for_file(testfile)) + if supports_ipv6(): + tcp6_proc = pyrun(tcp6_template) + tcp6_addr = eval(wait_for_file(testfile)) + udp6_proc = pyrun(udp6_template) + udp6_addr = eval(wait_for_file(testfile)) + else: + tcp6_proc = None + udp6_proc = None + tcp6_addr = None + udp6_addr = None + + for p in psutil.Process().children(): + cons = p.connections() + self.assertEqual(len(cons), 1) + for conn in cons: + # TCP v4 + if p.pid == tcp4_proc.pid: + check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (), + psutil.CONN_LISTEN, + ("all", "inet", "inet4", "tcp", "tcp4")) + # UDP v4 + elif p.pid == udp4_proc.pid: + check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (), + psutil.CONN_NONE, + ("all", "inet", "inet4", "udp", "udp4")) + # TCP v6 + elif p.pid == getattr(tcp6_proc, "pid", None): + check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (), + psutil.CONN_LISTEN, + ("all", "inet", "inet6", "tcp", "tcp6")) + # UDP v6 + elif p.pid == getattr(udp6_proc, "pid", None): + check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (), + psutil.CONN_NONE, + ("all", "inet", "inet6", "udp", "udp6")) + + @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), + 'AF_UNIX is not supported') + @skip_on_access_denied(only_if=OSX) + def test_connections_unix(self): + def check(type): + safe_remove(TESTFN) + sock = socket.socket(AF_UNIX, type) + with contextlib.closing(sock): + sock.bind(TESTFN) + cons = psutil.Process().connections(kind='unix') + conn = cons[0] + check_connection_ntuple(conn) + if conn.fd != -1: # != sunos and windows + self.assertEqual(conn.fd, sock.fileno()) + self.assertEqual(conn.family, AF_UNIX) + self.assertEqual(conn.type, type) + self.assertEqual(conn.laddr, TESTFN) + if not SUNOS: + # XXX Solaris can't retrieve system-wide UNIX + # sockets. + self.compare_proc_sys_cons(os.getpid(), cons) + + check(SOCK_STREAM) + check(SOCK_DGRAM) + + @unittest.skipUnless(hasattr(socket, "fromfd"), + 'socket.fromfd() is not availble') + @unittest.skipIf(WINDOWS or SUNOS, + 'connection fd not available on this platform') + def test_connection_fromfd(self): + with contextlib.closing(socket.socket()) as sock: + sock.bind(('localhost', 0)) + sock.listen(1) + p = psutil.Process() + for conn in p.connections(): + if conn.fd == sock.fileno(): + break + else: + self.fail("couldn't find socket fd") + dupsock = socket.fromfd(conn.fd, conn.family, conn.type) + with contextlib.closing(dupsock): + self.assertEqual(dupsock.getsockname(), conn.laddr) + self.assertNotEqual(sock.fileno(), dupsock.fileno()) + + def test_connection_constants(self): + ints = [] + strs = [] + for name in dir(psutil): + if name.startswith('CONN_'): + num = getattr(psutil, name) + str_ = str(num) + assert str_.isupper(), str_ + assert str_ not in strs, str_ + assert num not in ints, num + ints.append(num) + strs.append(str_) + if SUNOS: + psutil.CONN_IDLE + psutil.CONN_BOUND + if WINDOWS: + psutil.CONN_DELETE_TCB + + @unittest.skipUnless(POSIX, 'posix only') + def test_num_fds(self): + p = psutil.Process() + start = p.num_fds() + file = open(TESTFN, 'w') + self.addCleanup(file.close) + self.assertEqual(p.num_fds(), start + 1) + sock = socket.socket() + self.addCleanup(sock.close) + self.assertEqual(p.num_fds(), start + 2) + file.close() + sock.close() + self.assertEqual(p.num_fds(), start) + + @skip_on_not_implemented(only_if=LINUX) + def test_num_ctx_switches(self): + p = psutil.Process() + before = sum(p.num_ctx_switches()) + for x in range(500000): + after = sum(p.num_ctx_switches()) + if after > before: + return + self.fail("num ctx switches still the same after 50.000 iterations") + + def test_parent_ppid(self): + this_parent = os.getpid() + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + self.assertEqual(p.ppid(), this_parent) + self.assertEqual(p.parent().pid, this_parent) + # no other process is supposed to have us as parent + for p in psutil.process_iter(): + if p.pid == sproc.pid: + continue + self.assertTrue(p.ppid() != this_parent) + + def test_children(self): + p = psutil.Process() + self.assertEqual(p.children(), []) + self.assertEqual(p.children(recursive=True), []) + sproc = get_test_subprocess() + children1 = p.children() + children2 = p.children(recursive=True) + for children in (children1, children2): + self.assertEqual(len(children), 1) + self.assertEqual(children[0].pid, sproc.pid) + self.assertEqual(children[0].ppid(), os.getpid()) + + def test_children_recursive(self): + # here we create a subprocess which creates another one as in: + # A (parent) -> B (child) -> C (grandchild) + s = "import subprocess, os, sys, time;" + s += "PYTHON = os.path.realpath(sys.executable);" + s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];" + s += "subprocess.Popen(cmd);" + s += "time.sleep(60);" + get_test_subprocess(cmd=[PYTHON, "-c", s]) + p = psutil.Process() + self.assertEqual(len(p.children(recursive=False)), 1) + # give the grandchild some time to start + stop_at = time.time() + GLOBAL_TIMEOUT + while time.time() < stop_at: + children = p.children(recursive=True) + if len(children) > 1: + break + self.assertEqual(len(children), 2) + self.assertEqual(children[0].ppid(), os.getpid()) + self.assertEqual(children[1].ppid(), children[0].pid) + + def test_children_duplicates(self): + # find the process which has the highest number of children + table = collections.defaultdict(int) + for p in psutil.process_iter(): + try: + table[p.ppid()] += 1 + except psutil.Error: + pass + # this is the one, now let's make sure there are no duplicates + pid = sorted(table.items(), key=lambda x: x[1])[-1][0] + p = psutil.Process(pid) + try: + c = p.children(recursive=True) + except psutil.AccessDenied: # windows + pass + else: + self.assertEqual(len(c), len(set(c))) + + def test_suspend_resume(self): + sproc = get_test_subprocess(wait=True) + p = psutil.Process(sproc.pid) + p.suspend() + for x in range(100): + if p.status() == psutil.STATUS_STOPPED: + break + time.sleep(0.01) + p.resume() + self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) + + def test_invalid_pid(self): + self.assertRaises(TypeError, psutil.Process, "1") + self.assertRaises(ValueError, psutil.Process, -1) + + def test_as_dict(self): + p = psutil.Process() + d = p.as_dict(attrs=['exe', 'name']) + self.assertEqual(sorted(d.keys()), ['exe', 'name']) + + p = psutil.Process(min(psutil.pids())) + d = p.as_dict(attrs=['connections'], ad_value='foo') + if not isinstance(d['connections'], list): + self.assertEqual(d['connections'], 'foo') + + def test_halfway_terminated_process(self): + # Test that NoSuchProcess exception gets raised in case the + # process dies after we create the Process object. + # Example: + # >>> proc = Process(1234) + # >>> time.sleep(2) # time-consuming task, process dies in meantime + # >>> proc.name() + # Refers to Issue #15 + sproc = get_test_subprocess() + p = psutil.Process(sproc.pid) + p.terminate() + p.wait() + if WINDOWS: + wait_for_pid(p.pid) + self.assertFalse(p.is_running()) + self.assertFalse(p.pid in psutil.pids()) + + excluded_names = ['pid', 'is_running', 'wait', 'create_time'] + if LINUX and not RLIMIT_SUPPORT: + excluded_names.append('rlimit') + for name in dir(p): + if (name.startswith('_') or + name in excluded_names): + continue + try: + meth = getattr(p, name) + # get/set methods + if name == 'nice': + if POSIX: + ret = meth(1) + else: + ret = meth(psutil.NORMAL_PRIORITY_CLASS) + elif name == 'ionice': + ret = meth() + ret = meth(2) + elif name == 'rlimit': + ret = meth(psutil.RLIMIT_NOFILE) + ret = meth(psutil.RLIMIT_NOFILE, (5, 5)) + elif name == 'cpu_affinity': + ret = meth() + ret = meth([0]) + elif name == 'send_signal': + ret = meth(signal.SIGTERM) + else: + ret = meth() + except psutil.ZombieProcess: + self.fail("ZombieProcess for %r was not supposed to happen" % + name) + except psutil.NoSuchProcess: + pass + except NotImplementedError: + pass + else: + self.fail( + "NoSuchProcess exception not raised for %r, retval=%s" % ( + name, ret)) + + @unittest.skipUnless(POSIX, 'posix only') + def test_zombie_process(self): + def succeed_or_zombie_p_exc(fun, *args, **kwargs): + try: + fun(*args, **kwargs) + except (psutil.ZombieProcess, psutil.AccessDenied): + pass + + # Note: in this test we'll be creating two sub processes. + # Both of them are supposed to be freed / killed by + # reap_children() as they are attributable to 'us' + # (os.getpid()) via children(recursive=True). + src = textwrap.dedent("""\ + import os, sys, time, socket, contextlib + child_pid = os.fork() + if child_pid > 0: + time.sleep(3000) + else: + # this is the zombie process + s = socket.socket(socket.AF_UNIX) + with contextlib.closing(s): + s.connect('%s') + if sys.version_info < (3, ): + pid = str(os.getpid()) + else: + pid = bytes(str(os.getpid()), 'ascii') + s.sendall(pid) + """ % TESTFN) + with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: + try: + sock.settimeout(GLOBAL_TIMEOUT) + sock.bind(TESTFN) + sock.listen(1) + pyrun(src) + conn, _ = sock.accept() + select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT) + zpid = int(conn.recv(1024)) + zproc = psutil.Process(zpid) + call_until(lambda: zproc.status(), + "ret == psutil.STATUS_ZOMBIE") + # A zombie process should always be instantiable + zproc = psutil.Process(zpid) + # ...and at least its status always be querable + self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) + # ...and it should be considered 'running' + self.assertTrue(zproc.is_running()) + # ...and as_dict() shouldn't crash + zproc.as_dict() + if hasattr(zproc, "rlimit"): + succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE) + succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE, + (5, 5)) + # set methods + succeed_or_zombie_p_exc(zproc.parent) + if hasattr(zproc, 'cpu_affinity'): + succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) + succeed_or_zombie_p_exc(zproc.nice, 0) + if hasattr(zproc, 'ionice'): + if LINUX: + succeed_or_zombie_p_exc(zproc.ionice, 2, 0) + else: + succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows + if hasattr(zproc, 'rlimit'): + succeed_or_zombie_p_exc(zproc.rlimit, + psutil.RLIMIT_NOFILE, (5, 5)) + succeed_or_zombie_p_exc(zproc.suspend) + succeed_or_zombie_p_exc(zproc.resume) + succeed_or_zombie_p_exc(zproc.terminate) + succeed_or_zombie_p_exc(zproc.kill) + + # ...its parent should 'see' it + # edit: not true on BSD and OSX + # descendants = [x.pid for x in psutil.Process().children( + # recursive=True)] + # self.assertIn(zpid, descendants) + # XXX should we also assume ppid be usable? Note: this + # would be an important use case as the only way to get + # rid of a zombie is to kill its parent. + # self.assertEqual(zpid.ppid(), os.getpid()) + # ...and all other APIs should be able to deal with it + self.assertTrue(psutil.pid_exists(zpid)) + self.assertIn(zpid, psutil.pids()) + self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) + psutil._pmap = {} + self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) + finally: + reap_children(search_all=True) + + def test_pid_0(self): + # Process(0) is supposed to work on all platforms except Linux + if 0 not in psutil.pids(): + self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) + return + + p = psutil.Process(0) + self.assertTrue(p.name()) + + if POSIX: + try: + self.assertEqual(p.uids().real, 0) + self.assertEqual(p.gids().real, 0) + except psutil.AccessDenied: + pass + + self.assertRaisesRegexp( + ValueError, "preventing sending signal to process with PID 0", + p.send_signal, signal.SIGTERM) + + self.assertIn(p.ppid(), (0, 1)) + # self.assertEqual(p.exe(), "") + p.cmdline() + try: + p.num_threads() + except psutil.AccessDenied: + pass + + try: + p.memory_info() + except psutil.AccessDenied: + pass + + try: + if POSIX: + self.assertEqual(p.username(), 'root') + elif WINDOWS: + self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') + else: + p.username() + except psutil.AccessDenied: + pass + + self.assertIn(0, psutil.pids()) + self.assertTrue(psutil.pid_exists(0)) + + def test_Popen(self): + # Popen class test + # XXX this test causes a ResourceWarning on Python 3 because + # psutil.__subproc instance doesn't get propertly freed. + # Not sure what to do though. + cmd = [PYTHON, "-c", "import time; time.sleep(60);"] + proc = psutil.Popen(cmd, stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + try: + proc.name() + proc.stdin + self.assertTrue(hasattr(proc, 'name')) + self.assertTrue(hasattr(proc, 'stdin')) + self.assertTrue(dir(proc)) + self.assertRaises(AttributeError, getattr, proc, 'foo') + finally: + proc.kill() + proc.wait() + self.assertIsNotNone(proc.returncode) + + +# =================================================================== +# --- Featch all processes test +# =================================================================== + +class TestFetchAllProcesses(unittest.TestCase): + """Test which iterates over all running processes and performs + some sanity checks against Process API's returned values. + """ + + def setUp(self): + if POSIX: + import pwd + pall = pwd.getpwall() + self._uids = set([x.pw_uid for x in pall]) + self._usernames = set([x.pw_name for x in pall]) + + def test_fetch_all(self): + valid_procs = 0 + excluded_names = set([ + 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait', + 'as_dict', 'cpu_percent', 'parent', 'children', 'pid']) + if LINUX and not RLIMIT_SUPPORT: + excluded_names.add('rlimit') + attrs = [] + for name in dir(psutil.Process): + if name.startswith("_"): + continue + if name in excluded_names: + continue + attrs.append(name) + + default = object() + failures = [] + for name in attrs: + for p in psutil.process_iter(): + ret = default + try: + try: + args = () + attr = getattr(p, name, None) + if attr is not None and callable(attr): + if name == 'rlimit': + args = (psutil.RLIMIT_NOFILE,) + ret = attr(*args) + else: + ret = attr + valid_procs += 1 + except NotImplementedError: + msg = "%r was skipped because not implemented" % ( + self.__class__.__name__ + '.test_' + name) + warn(msg) + except (psutil.NoSuchProcess, psutil.AccessDenied) as err: + self.assertEqual(err.pid, p.pid) + if err.name: + # make sure exception's name attr is set + # with the actual process name + self.assertEqual(err.name, p.name()) + self.assertTrue(str(err)) + self.assertTrue(err.msg) + else: + if ret not in (0, 0.0, [], None, ''): + assert ret, ret + meth = getattr(self, name) + meth(ret) + except Exception as err: + s = '\n' + '=' * 70 + '\n' + s += "FAIL: test_%s (proc=%s" % (name, p) + if ret != default: + s += ", ret=%s)" % repr(ret) + s += ')\n' + s += '-' * 70 + s += "\n%s" % traceback.format_exc() + s = "\n".join((" " * 4) + i for i in s.splitlines()) + failures.append(s) + break + + if failures: + self.fail(''.join(failures)) + + # we should always have a non-empty list, not including PID 0 etc. + # special cases. + self.assertTrue(valid_procs > 0) + + def cmdline(self, ret): + pass + + def exe(self, ret): + if not ret: + self.assertEqual(ret, '') + else: + assert os.path.isabs(ret), ret + # Note: os.stat() may return False even if the file is there + # hence we skip the test, see: + # http://stackoverflow.com/questions/3112546/os-path-exists-lies + if POSIX and os.path.isfile(ret): + if hasattr(os, 'access') and hasattr(os, "X_OK"): + # XXX may fail on OSX + self.assertTrue(os.access(ret, os.X_OK)) + + def ppid(self, ret): + self.assertTrue(ret >= 0) + + def name(self, ret): + self.assertIsInstance(ret, (str, unicode)) + self.assertTrue(ret) + + def create_time(self, ret): + self.assertTrue(ret > 0) + # this can't be taken for granted on all platforms + # self.assertGreaterEqual(ret, psutil.boot_time()) + # make sure returned value can be pretty printed + # with strftime + time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) + + def uids(self, ret): + for uid in ret: + self.assertTrue(uid >= 0) + self.assertIn(uid, self._uids) + + def gids(self, ret): + # note: testing all gids as above seems not to be reliable for + # gid == 30 (nodoby); not sure why. + for gid in ret: + self.assertTrue(gid >= 0) + # self.assertIn(uid, self.gids + + def username(self, ret): + self.assertTrue(ret) + if POSIX: + self.assertIn(ret, self._usernames) + + def status(self, ret): + self.assertTrue(ret != "") + self.assertTrue(ret != '?') + self.assertIn(ret, VALID_PROC_STATUSES) + + def io_counters(self, ret): + for field in ret: + if field != -1: + self.assertTrue(field >= 0) + + def ionice(self, ret): + if LINUX: + self.assertTrue(ret.ioclass >= 0) + self.assertTrue(ret.value >= 0) + else: + self.assertTrue(ret >= 0) + self.assertIn(ret, (0, 1, 2)) + + def num_threads(self, ret): + self.assertTrue(ret >= 1) + + def threads(self, ret): + for t in ret: + self.assertTrue(t.id >= 0) + self.assertTrue(t.user_time >= 0) + self.assertTrue(t.system_time >= 0) + + def cpu_times(self, ret): + self.assertTrue(ret.user >= 0) + self.assertTrue(ret.system >= 0) + + def memory_info(self, ret): + self.assertTrue(ret.rss >= 0) + self.assertTrue(ret.vms >= 0) + + def memory_info_ex(self, ret): + for name in ret._fields: + self.assertTrue(getattr(ret, name) >= 0) + if POSIX and ret.vms != 0: + # VMS is always supposed to be the highest + for name in ret._fields: + if name != 'vms': + value = getattr(ret, name) + assert ret.vms > value, ret + elif WINDOWS: + assert ret.peak_wset >= ret.wset, ret + assert ret.peak_paged_pool >= ret.paged_pool, ret + assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret + assert ret.peak_pagefile >= ret.pagefile, ret + + def open_files(self, ret): + for f in ret: + if WINDOWS: + assert f.fd == -1, f + else: + self.assertIsInstance(f.fd, int) + assert os.path.isabs(f.path), f + assert os.path.isfile(f.path), f + + def num_fds(self, ret): + self.assertTrue(ret >= 0) + + def connections(self, ret): + self.assertEqual(len(ret), len(set(ret))) + for conn in ret: + check_connection_ntuple(conn) + + def cwd(self, ret): + if ret is not None: # BSD may return None + assert os.path.isabs(ret), ret + try: + st = os.stat(ret) + except OSError as err: + # directory has been removed in mean time + if err.errno != errno.ENOENT: + raise + else: + self.assertTrue(stat.S_ISDIR(st.st_mode)) + + def memory_percent(self, ret): + assert 0 <= ret <= 100, ret + + def is_running(self, ret): + self.assertTrue(ret) + + def cpu_affinity(self, ret): + assert ret != [], ret + + def terminal(self, ret): + if ret is not None: + assert os.path.isabs(ret), ret + assert os.path.exists(ret), ret + + def memory_maps(self, ret): + for nt in ret: + for fname in nt._fields: + value = getattr(nt, fname) + if fname == 'path': + if not value.startswith('['): + assert os.path.isabs(nt.path), nt.path + # commented as on Linux we might get + # '/foo/bar (deleted)' + # assert os.path.exists(nt.path), nt.path + elif fname in ('addr', 'perms'): + self.assertTrue(value) + else: + self.assertIsInstance(value, (int, long)) + assert value >= 0, value + + def num_handles(self, ret): + if WINDOWS: + self.assertGreaterEqual(ret, 0) + else: + self.assertGreaterEqual(ret, 0) + + def nice(self, ret): + if POSIX: + assert -20 <= ret <= 20, ret + else: + priorities = [getattr(psutil, x) for x in dir(psutil) + if x.endswith('_PRIORITY_CLASS')] + self.assertIn(ret, priorities) + + def num_ctx_switches(self, ret): + self.assertTrue(ret.voluntary >= 0) + self.assertTrue(ret.involuntary >= 0) + + def rlimit(self, ret): + self.assertEqual(len(ret), 2) + self.assertGreaterEqual(ret[0], -1) + self.assertGreaterEqual(ret[1], -1) + + +# =================================================================== +# --- Limited user tests +# =================================================================== + +@unittest.skipUnless(POSIX, "UNIX only") +@unittest.skipUnless(hasattr(os, 'getuid') and os.getuid() == 0, + "super user privileges are required") +class LimitedUserTestCase(TestProcess): + """Repeat the previous tests by using a limited user. + Executed only on UNIX and only if the user who run the test script + is root. + """ + # the uid/gid the test suite runs under + if hasattr(os, 'getuid'): + PROCESS_UID = os.getuid() + PROCESS_GID = os.getgid() + + def __init__(self, *args, **kwargs): + TestProcess.__init__(self, *args, **kwargs) + # re-define all existent test methods in order to + # ignore AccessDenied exceptions + for attr in [x for x in dir(self) if x.startswith('test')]: + meth = getattr(self, attr) + + def test_(self): + try: + meth() + except psutil.AccessDenied: + pass + setattr(self, attr, types.MethodType(test_, self)) + + def setUp(self): + safe_remove(TESTFN) + TestProcess.setUp(self) + os.setegid(1000) + os.seteuid(1000) + + def tearDown(self): + os.setegid(self.PROCESS_UID) + os.seteuid(self.PROCESS_GID) + TestProcess.tearDown(self) + + def test_nice(self): + try: + psutil.Process().nice(-1) + except psutil.AccessDenied: + pass + else: + self.fail("exception not raised") + + def test_zombie_process(self): + # causes problems if test test suite is run as root + pass + + +# =================================================================== +# --- Misc tests +# =================================================================== + +class TestMisc(unittest.TestCase): + """Misc / generic tests.""" + + def test_process__repr__(self, func=repr): + p = psutil.Process() + r = func(p) + self.assertIn("psutil.Process", r) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("name=", r) + self.assertIn(p.name(), r) + with mock.patch.object(psutil.Process, "name", + side_effect=psutil.ZombieProcess(os.getpid())): + p = psutil.Process() + r = func(p) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("zombie", r) + self.assertNotIn("name=", r) + with mock.patch.object(psutil.Process, "name", + side_effect=psutil.NoSuchProcess(os.getpid())): + p = psutil.Process() + r = func(p) + self.assertIn("pid=%s" % p.pid, r) + self.assertIn("terminated", r) + self.assertNotIn("name=", r) + + def test_process__str__(self): + self.test_process__repr__(func=str) + + def test_no_such_process__repr__(self, func=repr): + self.assertEqual( + repr(psutil.NoSuchProcess(321)), + "psutil.NoSuchProcess process no longer exists (pid=321)") + self.assertEqual( + repr(psutil.NoSuchProcess(321, name='foo')), + "psutil.NoSuchProcess process no longer exists (pid=321, " + "name='foo')") + self.assertEqual( + repr(psutil.NoSuchProcess(321, msg='foo')), + "psutil.NoSuchProcess foo") + + def test_zombie_process__repr__(self, func=repr): + self.assertEqual( + repr(psutil.ZombieProcess(321)), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321)") + self.assertEqual( + repr(psutil.ZombieProcess(321, name='foo')), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321, name='foo')") + self.assertEqual( + repr(psutil.ZombieProcess(321, name='foo', ppid=1)), + "psutil.ZombieProcess process still exists but it's a zombie " + "(pid=321, name='foo', ppid=1)") + self.assertEqual( + repr(psutil.ZombieProcess(321, msg='foo')), + "psutil.ZombieProcess foo") + + def test_access_denied__repr__(self, func=repr): + self.assertEqual( + repr(psutil.AccessDenied(321)), + "psutil.AccessDenied (pid=321)") + self.assertEqual( + repr(psutil.AccessDenied(321, name='foo')), + "psutil.AccessDenied (pid=321, name='foo')") + self.assertEqual( + repr(psutil.AccessDenied(321, msg='foo')), + "psutil.AccessDenied foo") + + def test_timeout_expired__repr__(self, func=repr): + self.assertEqual( + repr(psutil.TimeoutExpired(321)), + "psutil.TimeoutExpired timeout after 321 seconds") + self.assertEqual( + repr(psutil.TimeoutExpired(321, pid=111)), + "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") + self.assertEqual( + repr(psutil.TimeoutExpired(321, pid=111, name='foo')), + "psutil.TimeoutExpired timeout after 321 seconds " + "(pid=111, name='foo')") + + def test_process__eq__(self): + p1 = psutil.Process() + p2 = psutil.Process() + self.assertEqual(p1, p2) + p2._ident = (0, 0) + self.assertNotEqual(p1, p2) + self.assertNotEqual(p1, 'foo') + + def test_process__hash__(self): + s = set([psutil.Process(), psutil.Process()]) + self.assertEqual(len(s), 1) + + def test__all__(self): + dir_psutil = dir(psutil) + for name in dir_psutil: + if name in ('callable', 'error', 'namedtuple', + 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', + 'TOTAL_PHYMEM'): + continue + if not name.startswith('_'): + try: + __import__(name) + except ImportError: + if name not in psutil.__all__: + fun = getattr(psutil, name) + if fun is None: + continue + if (fun.__doc__ is not None and + 'deprecated' not in fun.__doc__.lower()): + self.fail('%r not in psutil.__all__' % name) + + # Import 'star' will break if __all__ is inconsistent, see: + # https://github.com/giampaolo/psutil/issues/656 + # Can't do `from psutil import *` as it won't work on python 3 + # so we simply iterate over __all__. + for name in psutil.__all__: + self.assertIn(name, dir_psutil) + + def test_version(self): + self.assertEqual('.'.join([str(x) for x in psutil.version_info]), + psutil.__version__) + + def test_memoize(self): + from psutil._common import memoize + + @memoize + def foo(*args, **kwargs): + "foo docstring" + calls.append(None) + return (args, kwargs) + + calls = [] + # no args + for x in range(2): + ret = foo() + expected = ((), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 1) + # with args + for x in range(2): + ret = foo(1) + expected = ((1, ), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 2) + # with args + kwargs + for x in range(2): + ret = foo(1, bar=2) + expected = ((1, ), {'bar': 2}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 3) + # clear cache + foo.cache_clear() + ret = foo() + expected = ((), {}) + self.assertEqual(ret, expected) + self.assertEqual(len(calls), 4) + # docstring + self.assertEqual(foo.__doc__, "foo docstring") + + def test_isfile_strict(self): + from psutil._common import isfile_strict + this_file = os.path.abspath(__file__) + assert isfile_strict(this_file) + assert not isfile_strict(os.path.dirname(this_file)) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EPERM, "foo")): + self.assertRaises(OSError, isfile_strict, this_file) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EACCES, "foo")): + self.assertRaises(OSError, isfile_strict, this_file) + with mock.patch('psutil._common.os.stat', + side_effect=OSError(errno.EINVAL, "foo")): + assert not isfile_strict(this_file) + with mock.patch('psutil._common.stat.S_ISREG', return_value=False): + assert not isfile_strict(this_file) + + def test_serialization(self): + def check(ret): + if json is not None: + json.loads(json.dumps(ret)) + a = pickle.dumps(ret) + b = pickle.loads(a) + self.assertEqual(ret, b) + + check(psutil.Process().as_dict()) + check(psutil.virtual_memory()) + check(psutil.swap_memory()) + check(psutil.cpu_times()) + check(psutil.cpu_times_percent(interval=0)) + check(psutil.net_io_counters()) + if LINUX and not os.path.exists('/proc/diskstats'): + pass + else: + if not APPVEYOR: + check(psutil.disk_io_counters()) + check(psutil.disk_partitions()) + check(psutil.disk_usage(os.getcwd())) + check(psutil.users()) + + def test_setup_script(self): + here = os.path.abspath(os.path.dirname(__file__)) + setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py')) + module = imp.load_source('setup', setup_py) + self.assertRaises(SystemExit, module.setup) + self.assertEqual(module.get_version(), psutil.__version__) + + def test_ad_on_process_creation(self): + # We are supposed to be able to instantiate Process also in case + # of zombie processes or access denied. + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.AccessDenied) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=psutil.ZombieProcess(1)) as meth: + psutil.Process() + assert meth.called + with mock.patch.object(psutil.Process, 'create_time', + side_effect=ValueError) as meth: + with self.assertRaises(ValueError): + psutil.Process() + assert meth.called + + +# =================================================================== +# --- Example script tests +# =================================================================== + +class TestExampleScripts(unittest.TestCase): + """Tests for scripts in the examples directory.""" + + def assert_stdout(self, exe, args=None): + exe = os.path.join(EXAMPLES_DIR, exe) + if args: + exe = exe + ' ' + args + try: + out = sh(sys.executable + ' ' + exe).strip() + except RuntimeError as err: + if 'AccessDenied' in str(err): + return str(err) + else: + raise + assert out, out + return out + + def assert_syntax(self, exe, args=None): + exe = os.path.join(EXAMPLES_DIR, exe) + with open(exe, 'r') as f: + src = f.read() + ast.parse(src) + + def test_check_presence(self): + # make sure all example scripts have a test method defined + meths = dir(self) + for name in os.listdir(EXAMPLES_DIR): + if name.endswith('.py'): + if 'test_' + os.path.splitext(name)[0] not in meths: + # self.assert_stdout(name) + self.fail('no test defined for %r script' + % os.path.join(EXAMPLES_DIR, name)) + + def test_disk_usage(self): + self.assert_stdout('disk_usage.py') + + def test_free(self): + self.assert_stdout('free.py') + + def test_meminfo(self): + self.assert_stdout('meminfo.py') + + def test_process_detail(self): + self.assert_stdout('process_detail.py') + + @unittest.skipIf(APPVEYOR, "can't find users on Appveyor") + def test_who(self): + self.assert_stdout('who.py') + + def test_ps(self): + self.assert_stdout('ps.py') + + def test_pstree(self): + self.assert_stdout('pstree.py') + + def test_netstat(self): + self.assert_stdout('netstat.py') + + @unittest.skipIf(TRAVIS, "permission denied on travis") + def test_ifconfig(self): + self.assert_stdout('ifconfig.py') + + def test_pmap(self): + self.assert_stdout('pmap.py', args=str(os.getpid())) + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_killall(self): + self.assert_syntax('killall.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_nettop(self): + self.assert_syntax('nettop.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_top(self): + self.assert_syntax('top.py') + + @unittest.skipIf(ast is None, + 'ast module not available on this python version') + def test_iotop(self): + self.assert_syntax('iotop.py') + + def test_pidof(self): + output = self.assert_stdout('pidof.py %s' % psutil.Process().name()) + self.assertIn(str(os.getpid()), output) + + +def main(): + tests = [] + test_suite = unittest.TestSuite() + tests.append(TestSystemAPIs) + tests.append(TestProcess) + tests.append(TestFetchAllProcesses) + tests.append(TestMisc) + tests.append(TestExampleScripts) + tests.append(LimitedUserTestCase) + + if POSIX: + from _posix import PosixSpecificTestCase + tests.append(PosixSpecificTestCase) + + # import the specific platform test suite + stc = None + if LINUX: + from _linux import LinuxSpecificTestCase as stc + elif WINDOWS: + from _windows import WindowsSpecificTestCase as stc + from _windows import TestDualProcessImplementation + tests.append(TestDualProcessImplementation) + elif OSX: + from _osx import OSXSpecificTestCase as stc + elif BSD: + from _bsd import BSDSpecificTestCase as stc + elif SUNOS: + from _sunos import SunOSSpecificTestCase as stc + if stc is not None: + tests.append(stc) + + for test_class in tests: + test_suite.addTest(unittest.makeSuite(test_class)) + result = unittest.TextTestRunner(verbosity=2).run(test_suite) + return result.wasSuccessful() + +if __name__ == '__main__': + if not main(): + sys.exit(1) |