diff options
Diffstat (limited to 'python/psutil/test')
m--------- | python/psutil | 0 | ||||
-rw-r--r-- | python/psutil/test/README.rst | 21 | ||||
-rw-r--r-- | python/psutil/test/_bsd.py | 252 | ||||
-rw-r--r-- | python/psutil/test/_linux.py | 473 | ||||
-rw-r--r-- | python/psutil/test/_osx.py | 160 | ||||
-rw-r--r-- | python/psutil/test/_posix.py | 258 | ||||
-rw-r--r-- | python/psutil/test/_sunos.py | 48 | ||||
-rw-r--r-- | python/psutil/test/_windows.py | 464 | ||||
-rw-r--r-- | python/psutil/test/test_memory_leaks.py | 445 | ||||
-rw-r--r-- | python/psutil/test/test_psutil.py | 3013 |
10 files changed, 0 insertions, 5134 deletions
diff --git a/python/psutil b/python/psutil new file mode 160000 +Subproject a0967043b5819b2edc61d9a12306289d5e7f98c diff --git a/python/psutil/test/README.rst b/python/psutil/test/README.rst deleted file mode 100644 index 3f2a468ef..000000000 --- a/python/psutil/test/README.rst +++ /dev/null @@ -1,21 +0,0 @@ -- The recommended way to run tests (also on Windows) is to cd into parent - directory and run ``make test`` - -- Dependencies for running tests: - - python 2.6: ipaddress, mock, unittest2 - - python 2.7: ipaddress, mock - - python 3.2: ipaddress, mock - - python 3.3: ipaddress - - python >= 3.4: no deps required - -- The main test script is ``test_psutil.py``, which also imports platform-specific - ``_*.py`` scripts (which should be ignored). - -- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must - be run separately with ``make test-memleaks``. - -- To run tests on all supported Python version install tox (pip install tox) - then run ``tox``. - -- Every time a commit is pushed tests are automatically run on Travis: - https://travis-ci.org/giampaolo/psutil/ diff --git a/python/psutil/test/_bsd.py b/python/psutil/test/_bsd.py deleted file mode 100644 index e4a3225d2..000000000 --- a/python/psutil/test/_bsd.py +++ /dev/null @@ -1,252 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -# TODO: add test for comparing connections with 'sockstat' cmd - -"""BSD specific tests. These are implicitly run by test_psutil.py.""" - -import os -import subprocess -import sys -import time - -import psutil - -from psutil._compat import PY3 -from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which, - retry_before_failing, reap_children, unittest) - - -PAGESIZE = os.sysconf("SC_PAGE_SIZE") -if os.getuid() == 0: # muse requires root privileges - MUSE_AVAILABLE = which('muse') -else: - MUSE_AVAILABLE = False - - -def sysctl(cmdline): - """Expects a sysctl command with an argument and parse the result - returning only the value of interest. - """ - result = sh("sysctl " + cmdline) - result = result[result.find(": ") + 2:] - try: - return int(result) - except ValueError: - return result - - -def muse(field): - """Thin wrapper around 'muse' cmdline utility.""" - out = sh('muse') - for line in out.split('\n'): - if line.startswith(field): - break - else: - raise ValueError("line not found") - return int(line.split()[1]) - - -@unittest.skipUnless(BSD, "not a BSD system") -class BSDSpecificTestCase(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.pid = get_test_subprocess().pid - - @classmethod - def tearDownClass(cls): - reap_children() - - def test_boot_time(self): - s = sysctl('sysctl kern.boottime') - s = s[s.find(" sec = ") + 7:] - s = s[:s.find(',')] - btime = int(s) - self.assertEqual(btime, psutil.boot_time()) - - def test_process_create_time(self): - cmdline = "ps -o lstart -p %s" % self.pid - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0] - if PY3: - output = str(output, sys.stdout.encoding) - start_ps = output.replace('STARTED', '').strip() - start_psutil = psutil.Process(self.pid).create_time() - start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", - time.localtime(start_psutil)) - self.assertEqual(start_ps, start_psutil) - - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -k "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total = int(total) * 1024 - used = int(used) * 1024 - free = int(free) * 1024 - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.free, free)) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.used, used)) - - @retry_before_failing() - def test_memory_maps(self): - out = sh('procstat -v %s' % self.pid) - maps = psutil.Process(self.pid).memory_maps(grouped=False) - lines = out.split('\n')[1:] - while lines: - line = lines.pop() - fields = line.split() - _, start, stop, perms, res = fields[:5] - map = maps.pop() - self.assertEqual("%s-%s" % (start, stop), map.addr) - self.assertEqual(int(res), map.rss) - if not map.path.startswith('['): - self.assertEqual(fields[10], map.path) - - def test_exe(self): - out = sh('procstat -b %s' % self.pid) - self.assertEqual(psutil.Process(self.pid).exe(), - out.split('\n')[1].split()[-1]) - - def test_cmdline(self): - out = sh('procstat -c %s' % self.pid) - self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()), - ' '.join(out.split('\n')[1].split()[2:])) - - def test_uids_gids(self): - out = sh('procstat -s %s' % self.pid) - euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8] - p = psutil.Process(self.pid) - uids = p.uids() - gids = p.gids() - self.assertEqual(uids.real, int(ruid)) - self.assertEqual(uids.effective, int(euid)) - self.assertEqual(uids.saved, int(suid)) - self.assertEqual(gids.real, int(rgid)) - self.assertEqual(gids.effective, int(egid)) - self.assertEqual(gids.saved, int(sgid)) - - # --- virtual_memory(); tests against sysctl - - def test_vmem_total(self): - syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE - self.assertEqual(psutil.virtual_memory().total, syst) - - @retry_before_failing() - def test_vmem_active(self): - syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE - self.assertAlmostEqual(psutil.virtual_memory().active, syst, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_inactive(self): - syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE - self.assertAlmostEqual(psutil.virtual_memory().inactive, syst, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_wired(self): - syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE - self.assertAlmostEqual(psutil.virtual_memory().wired, syst, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_cached(self): - syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE - self.assertAlmostEqual(psutil.virtual_memory().cached, syst, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_free(self): - syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE - self.assertAlmostEqual(psutil.virtual_memory().free, syst, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_buffers(self): - syst = sysctl("vfs.bufspace") - self.assertAlmostEqual(psutil.virtual_memory().buffers, syst, - delta=TOLERANCE) - - def test_cpu_count_logical(self): - syst = sysctl("hw.ncpu") - self.assertEqual(psutil.cpu_count(logical=True), syst) - - # --- virtual_memory(); tests against muse - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - def test_total(self): - num = muse('Total') - self.assertEqual(psutil.virtual_memory().total, num) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_active(self): - num = muse('Active') - self.assertAlmostEqual(psutil.virtual_memory().active, num, - delta=TOLERANCE) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_inactive(self): - num = muse('Inactive') - self.assertAlmostEqual(psutil.virtual_memory().inactive, num, - delta=TOLERANCE) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_wired(self): - num = muse('Wired') - self.assertAlmostEqual(psutil.virtual_memory().wired, num, - delta=TOLERANCE) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_cached(self): - num = muse('Cache') - self.assertAlmostEqual(psutil.virtual_memory().cached, num, - delta=TOLERANCE) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_free(self): - num = muse('Free') - self.assertAlmostEqual(psutil.virtual_memory().free, num, - delta=TOLERANCE) - - @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available") - @retry_before_failing() - def test_buffers(self): - num = muse('Buffer') - self.assertAlmostEqual(psutil.virtual_memory().buffers, num, - delta=TOLERANCE) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/_linux.py b/python/psutil/test/_linux.py deleted file mode 100644 index c1927ea8b..000000000 --- a/python/psutil/test/_linux.py +++ /dev/null @@ -1,473 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Linux specific tests. These are implicitly run by test_psutil.py.""" - -from __future__ import division -import contextlib -import errno -import fcntl -import io -import os -import pprint -import re -import socket -import struct -import sys -import tempfile -import time -import warnings - -try: - from unittest import mock # py3 -except ImportError: - import mock # requires "pip install mock" - -from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX -from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess, - retry_before_failing, get_kernel_version, unittest, - which, call_until) - -import psutil -import psutil._pslinux -from psutil._compat import PY3, u - - -SIOCGIFADDR = 0x8915 -SIOCGIFCONF = 0x8912 -SIOCGIFHWADDR = 0x8927 - - -def get_ipv4_address(ifname): - ifname = ifname[:15] - if PY3: - ifname = bytes(ifname, 'ascii') - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - with contextlib.closing(s): - return socket.inet_ntoa( - fcntl.ioctl(s.fileno(), - SIOCGIFADDR, - struct.pack('256s', ifname))[20:24]) - - -def get_mac_address(ifname): - ifname = ifname[:15] - if PY3: - ifname = bytes(ifname, 'ascii') - s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - with contextlib.closing(s): - info = fcntl.ioctl( - s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname)) - if PY3: - def ord(x): - return x - else: - import __builtin__ - ord = __builtin__.ord - return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1] - - -@unittest.skipUnless(LINUX, "not a Linux system") -class LinuxSpecificTestCase(unittest.TestCase): - - @unittest.skipIf( - POSIX and not hasattr(os, 'statvfs'), - reason="os.statvfs() function not available on this platform") - @skip_on_not_implemented() - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -P -B 1 "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total, used, free = int(total), int(used), int(free) - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.free, free)) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % (usage.used, used)) - - def test_memory_maps(self): - sproc = get_test_subprocess() - time.sleep(1) - p = psutil.Process(sproc.pid) - maps = p.memory_maps(grouped=False) - pmap = sh('pmap -x %s' % p.pid).split('\n') - # get rid of header - del pmap[0] - del pmap[0] - while maps and pmap: - this = maps.pop(0) - other = pmap.pop(0) - addr, _, rss, dirty, mode, path = other.split(None, 5) - if not path.startswith('[') and not path.endswith(']'): - self.assertEqual(path, os.path.basename(this.path)) - self.assertEqual(int(rss) * 1024, this.rss) - # test only rwx chars, ignore 's' and 'p' - self.assertEqual(mode[:3], this.perms[:3]) - - def test_vmem_total(self): - lines = sh('free').split('\n')[1:] - total = int(lines[0].split()[1]) * 1024 - self.assertEqual(total, psutil.virtual_memory().total) - - @retry_before_failing() - def test_vmem_used(self): - lines = sh('free').split('\n')[1:] - used = int(lines[0].split()[2]) * 1024 - self.assertAlmostEqual(used, psutil.virtual_memory().used, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_free(self): - lines = sh('free').split('\n')[1:] - free = int(lines[0].split()[3]) * 1024 - self.assertAlmostEqual(free, psutil.virtual_memory().free, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_buffers(self): - lines = sh('free').split('\n')[1:] - buffers = int(lines[0].split()[5]) * 1024 - self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_cached(self): - lines = sh('free').split('\n')[1:] - cached = int(lines[0].split()[6]) * 1024 - self.assertAlmostEqual(cached, psutil.virtual_memory().cached, - delta=TOLERANCE) - - def test_swapmem_total(self): - lines = sh('free').split('\n')[1:] - total = int(lines[2].split()[1]) * 1024 - self.assertEqual(total, psutil.swap_memory().total) - - @retry_before_failing() - def test_swapmem_used(self): - lines = sh('free').split('\n')[1:] - used = int(lines[2].split()[2]) * 1024 - self.assertAlmostEqual(used, psutil.swap_memory().used, - delta=TOLERANCE) - - @retry_before_failing() - def test_swapmem_free(self): - lines = sh('free').split('\n')[1:] - free = int(lines[2].split()[3]) * 1024 - self.assertAlmostEqual(free, psutil.swap_memory().free, - delta=TOLERANCE) - - @unittest.skipIf(TRAVIS, "unknown failure on travis") - def test_cpu_times(self): - fields = psutil.cpu_times()._fields - kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0] - kernel_ver_info = tuple(map(int, kernel_ver.split('.'))) - if kernel_ver_info >= (2, 6, 11): - self.assertIn('steal', fields) - else: - self.assertNotIn('steal', fields) - if kernel_ver_info >= (2, 6, 24): - self.assertIn('guest', fields) - else: - self.assertNotIn('guest', fields) - if kernel_ver_info >= (3, 2, 0): - self.assertIn('guest_nice', fields) - else: - self.assertNotIn('guest_nice', fields) - - def test_net_if_addrs_ips(self): - for name, addrs in psutil.net_if_addrs().items(): - for addr in addrs: - if addr.family == psutil.AF_LINK: - self.assertEqual(addr.address, get_mac_address(name)) - elif addr.family == socket.AF_INET: - self.assertEqual(addr.address, get_ipv4_address(name)) - # TODO: test for AF_INET6 family - - @unittest.skipUnless(which('ip'), "'ip' utility not available") - @unittest.skipIf(TRAVIS, "skipped on Travis") - def test_net_if_names(self): - out = sh("ip addr").strip() - nics = psutil.net_if_addrs() - found = 0 - for line in out.split('\n'): - line = line.strip() - if re.search("^\d+:", line): - found += 1 - name = line.split(':')[1].strip() - self.assertIn(name, nics.keys()) - self.assertEqual(len(nics), found, msg="%s\n---\n%s" % ( - pprint.pformat(nics), out)) - - @unittest.skipUnless(which("nproc"), "nproc utility not available") - def test_cpu_count_logical_w_nproc(self): - num = int(sh("nproc --all")) - self.assertEqual(psutil.cpu_count(logical=True), num) - - @unittest.skipUnless(which("lscpu"), "lscpu utility not available") - def test_cpu_count_logical_w_lscpu(self): - out = sh("lscpu -p") - num = len([x for x in out.split('\n') if not x.startswith('#')]) - self.assertEqual(psutil.cpu_count(logical=True), num) - - # --- mocked tests - - def test_virtual_memory_mocked_warnings(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - with warnings.catch_warnings(record=True) as ws: - warnings.simplefilter("always") - ret = psutil._pslinux.virtual_memory() - assert m.called - self.assertEqual(len(ws), 1) - w = ws[0] - self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) - self.assertIn( - "'cached', 'active' and 'inactive' memory stats couldn't " - "be determined", str(w.message)) - self.assertEqual(ret.cached, 0) - self.assertEqual(ret.active, 0) - self.assertEqual(ret.inactive, 0) - - def test_swap_memory_mocked_warnings(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - with warnings.catch_warnings(record=True) as ws: - warnings.simplefilter("always") - ret = psutil._pslinux.swap_memory() - assert m.called - self.assertEqual(len(ws), 1) - w = ws[0] - self.assertTrue(w.filename.endswith('psutil/_pslinux.py')) - self.assertIn( - "'sin' and 'sout' swap memory stats couldn't " - "be determined", str(w.message)) - self.assertEqual(ret.sin, 0) - self.assertEqual(ret.sout, 0) - - def test_cpu_count_logical_mocked(self): - import psutil._pslinux - original = psutil._pslinux.cpu_count_logical() - # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in - # order to cause the parsing of /proc/cpuinfo and /proc/stat. - with mock.patch( - 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m: - self.assertEqual(psutil._pslinux.cpu_count_logical(), original) - assert m.called - - # Let's have open() return emtpy data and make sure None is - # returned ('cause we mimick os.cpu_count()). - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertIsNone(psutil._pslinux.cpu_count_logical()) - self.assertEqual(m.call_count, 2) - # /proc/stat should be the last one - self.assertEqual(m.call_args[0][0], '/proc/stat') - - # Let's push this a bit further and make sure /proc/cpuinfo - # parsing works as expected. - with open('/proc/cpuinfo', 'rb') as f: - cpuinfo_data = f.read() - fake_file = io.BytesIO(cpuinfo_data) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m: - self.assertEqual(psutil._pslinux.cpu_count_logical(), original) - - def test_cpu_count_physical_mocked(self): - # Have open() return emtpy data and make sure None is returned - # ('cause we want to mimick os.cpu_count()) - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertIsNone(psutil._pslinux.cpu_count_physical()) - assert m.called - - def test_proc_open_files_file_gone(self): - # simulates a file which gets deleted during open_files() - # execution - p = psutil.Process() - files = p.open_files() - with tempfile.NamedTemporaryFile(): - # give the kernel some time to see the new file - call_until(p.open_files, "len(ret) != %i" % len(files)) - with mock.patch('psutil._pslinux.os.readlink', - side_effect=OSError(errno.ENOENT, "")) as m: - files = p.open_files() - assert not files - assert m.called - # also simulate the case where os.readlink() returns EINVAL - # in which case psutil is supposed to 'continue' - with mock.patch('psutil._pslinux.os.readlink', - side_effect=OSError(errno.EINVAL, "")) as m: - self.assertEqual(p.open_files(), []) - assert m.called - - def test_proc_terminal_mocked(self): - with mock.patch('psutil._pslinux._psposix._get_terminal_map', - return_value={}) as m: - self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal()) - assert m.called - - def test_proc_num_ctx_switches_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).num_ctx_switches) - assert m.called - - def test_proc_num_threads_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).num_threads) - assert m.called - - def test_proc_ppid_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).ppid) - assert m.called - - def test_proc_uids_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).uids) - assert m.called - - def test_proc_gids_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).gids) - assert m.called - - def test_proc_cmdline_mocked(self): - # see: https://github.com/giampaolo/psutil/issues/639 - p = psutil.Process() - fake_file = io.StringIO(u('foo\x00bar\x00')) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m: - p.cmdline() == ['foo', 'bar'] - assert m.called - fake_file = io.StringIO(u('foo\x00bar\x00\x00')) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m: - p.cmdline() == ['foo', 'bar', ''] - assert m.called - - def test_proc_io_counters_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - NotImplementedError, - psutil._pslinux.Process(os.getpid()).io_counters) - assert m.called - - def test_boot_time_mocked(self): - with mock.patch('psutil._pslinux.open', create=True) as m: - self.assertRaises( - RuntimeError, - psutil._pslinux.boot_time) - assert m.called - - def test_users_mocked(self): - # Make sure ':0' and ':0.0' (returned by C ext) are converted - # to 'localhost'. - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', ':0', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'localhost') - assert m.called - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', ':0.0', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'localhost') - assert m.called - # ...otherwise it should be returned as-is - with mock.patch('psutil._pslinux.cext.users', - return_value=[('giampaolo', 'pts/2', 'foo', - 1436573184.0, True)]) as m: - self.assertEqual(psutil.users()[0].host, 'foo') - assert m.called - - def test_disk_partitions_mocked(self): - # Test that ZFS partitions are returned. - with open("/proc/filesystems", "r") as f: - data = f.read() - if 'zfs' in data: - for part in psutil.disk_partitions(): - if part.fstype == 'zfs': - break - else: - self.fail("couldn't find any ZFS partition") - else: - # No ZFS partitions on this system. Let's fake one. - fake_file = io.StringIO(u("nodev\tzfs\n")) - with mock.patch('psutil._pslinux.open', - return_value=fake_file, create=True) as m1: - with mock.patch( - 'psutil._pslinux.cext.disk_partitions', - return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2: - ret = psutil.disk_partitions() - assert m1.called - assert m2.called - assert ret - self.assertEqual(ret[0].fstype, 'zfs') - - # --- tests for specific kernel versions - - @unittest.skipUnless( - get_kernel_version() >= (2, 6, 36), - "prlimit() not available on this Linux kernel version") - def test_prlimit_availability(self): - # prlimit() should be available starting from kernel 2.6.36 - p = psutil.Process(os.getpid()) - p.rlimit(psutil.RLIMIT_NOFILE) - # if prlimit() is supported *at least* these constants should - # be available - self.assertTrue(hasattr(psutil, "RLIM_INFINITY")) - self.assertTrue(hasattr(psutil, "RLIMIT_AS")) - self.assertTrue(hasattr(psutil, "RLIMIT_CORE")) - self.assertTrue(hasattr(psutil, "RLIMIT_CPU")) - self.assertTrue(hasattr(psutil, "RLIMIT_DATA")) - self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE")) - self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS")) - self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK")) - self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE")) - self.assertTrue(hasattr(psutil, "RLIMIT_NPROC")) - self.assertTrue(hasattr(psutil, "RLIMIT_RSS")) - self.assertTrue(hasattr(psutil, "RLIMIT_STACK")) - - @unittest.skipUnless( - get_kernel_version() >= (3, 0), - "prlimit constants not available on this Linux kernel version") - def test_resource_consts_kernel_v(self): - # more recent constants - self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE")) - self.assertTrue(hasattr(psutil, "RLIMIT_NICE")) - self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO")) - self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME")) - self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING")) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/_osx.py b/python/psutil/test/_osx.py deleted file mode 100644 index 6e6e4380e..000000000 --- a/python/psutil/test/_osx.py +++ /dev/null @@ -1,160 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""OSX specific tests. These are implicitly run by test_psutil.py.""" - -import os -import re -import subprocess -import sys -import time - -import psutil - -from psutil._compat import PY3 -from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess, - reap_children, retry_before_failing, unittest) - - -PAGESIZE = os.sysconf("SC_PAGE_SIZE") - - -def sysctl(cmdline): - """Expects a sysctl command with an argument and parse the result - returning only the value of interest. - """ - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - result = p.communicate()[0].strip().split()[1] - if PY3: - result = str(result, sys.stdout.encoding) - try: - return int(result) - except ValueError: - return result - - -def vm_stat(field): - """Wrapper around 'vm_stat' cmdline utility.""" - out = sh('vm_stat') - for line in out.split('\n'): - if field in line: - break - else: - raise ValueError("line not found") - return int(re.search('\d+', line).group(0)) * PAGESIZE - - -@unittest.skipUnless(OSX, "not an OSX system") -class OSXSpecificTestCase(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.pid = get_test_subprocess().pid - - @classmethod - def tearDownClass(cls): - reap_children() - - def test_process_create_time(self): - cmdline = "ps -o lstart -p %s" % self.pid - p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0] - if PY3: - output = str(output, sys.stdout.encoding) - start_ps = output.replace('STARTED', '').strip() - start_psutil = psutil.Process(self.pid).create_time() - start_psutil = time.strftime("%a %b %e %H:%M:%S %Y", - time.localtime(start_psutil)) - self.assertEqual(start_ps, start_psutil) - - def test_disks(self): - # test psutil.disk_usage() and psutil.disk_partitions() - # against "df -a" - def df(path): - out = sh('df -k "%s"' % path).strip() - lines = out.split('\n') - lines.pop(0) - line = lines.pop(0) - dev, total, used, free = line.split()[:4] - if dev == 'none': - dev = '' - total = int(total) * 1024 - used = int(used) * 1024 - free = int(free) * 1024 - return dev, total, used, free - - for part in psutil.disk_partitions(all=False): - usage = psutil.disk_usage(part.mountpoint) - dev, total, used, free = df(part.mountpoint) - self.assertEqual(part.device, dev) - self.assertEqual(usage.total, total) - # 10 MB tollerance - if abs(usage.free - free) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.free, free) - if abs(usage.used - used) > 10 * 1024 * 1024: - self.fail("psutil=%s, df=%s" % usage.used, used) - - # --- virtual mem - - def test_vmem_total(self): - sysctl_hwphymem = sysctl('sysctl hw.memsize') - self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total) - - @retry_before_failing() - def test_vmem_free(self): - num = vm_stat("free") - self.assertAlmostEqual(psutil.virtual_memory().free, num, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_active(self): - num = vm_stat("active") - self.assertAlmostEqual(psutil.virtual_memory().active, num, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_inactive(self): - num = vm_stat("inactive") - self.assertAlmostEqual(psutil.virtual_memory().inactive, num, - delta=TOLERANCE) - - @retry_before_failing() - def test_vmem_wired(self): - num = vm_stat("wired") - self.assertAlmostEqual(psutil.virtual_memory().wired, num, - delta=TOLERANCE) - - # --- swap mem - - def test_swapmem_sin(self): - num = vm_stat("Pageins") - self.assertEqual(psutil.swap_memory().sin, num) - - def test_swapmem_sout(self): - num = vm_stat("Pageouts") - self.assertEqual(psutil.swap_memory().sout, num) - - def test_swapmem_total(self): - tot1 = psutil.swap_memory().total - tot2 = 0 - # OSX uses multiple cache files: - # http://en.wikipedia.org/wiki/Paging#OS_X - for name in os.listdir("/var/vm/"): - file = os.path.join("/var/vm", name) - if os.path.isfile(file): - tot2 += os.path.getsize(file) - self.assertEqual(tot1, tot2) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/_posix.py b/python/psutil/test/_posix.py deleted file mode 100644 index e6c56aac3..000000000 --- a/python/psutil/test/_posix.py +++ /dev/null @@ -1,258 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""POSIX specific tests. These are implicitly run by test_psutil.py.""" - -import datetime -import os -import subprocess -import sys -import time - -import psutil - -from psutil._compat import PY3, callable -from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS -from test_psutil import (get_test_subprocess, skip_on_access_denied, - retry_before_failing, reap_children, sh, unittest, - get_kernel_version, wait_for_pid) - - -def ps(cmd): - """Expects a ps command with a -o argument and parse the result - returning only the value of interest. - """ - if not LINUX: - cmd = cmd.replace(" --no-headers ", " ") - if SUNOS: - cmd = cmd.replace("-o command", "-o comm") - cmd = cmd.replace("-o start", "-o stime") - p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if PY3: - output = str(output, sys.stdout.encoding) - if not LINUX: - output = output.split('\n')[1].strip() - try: - return int(output) - except ValueError: - return output - - -@unittest.skipUnless(POSIX, "not a POSIX system") -class PosixSpecificTestCase(unittest.TestCase): - """Compare psutil results against 'ps' command line utility.""" - - @classmethod - def setUpClass(cls): - cls.pid = get_test_subprocess([PYTHON, "-E", "-O"], - stdin=subprocess.PIPE).pid - wait_for_pid(cls.pid) - - @classmethod - def tearDownClass(cls): - reap_children() - - # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps - - def test_process_parent_pid(self): - ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid) - ppid_psutil = psutil.Process(self.pid).ppid() - self.assertEqual(ppid_ps, ppid_psutil) - - def test_process_uid(self): - uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid) - uid_psutil = psutil.Process(self.pid).uids().real - self.assertEqual(uid_ps, uid_psutil) - - def test_process_gid(self): - gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid) - gid_psutil = psutil.Process(self.pid).gids().real - self.assertEqual(gid_ps, gid_psutil) - - def test_process_username(self): - username_ps = ps("ps --no-headers -o user -p %s" % self.pid) - username_psutil = psutil.Process(self.pid).username() - self.assertEqual(username_ps, username_psutil) - - @skip_on_access_denied() - @retry_before_failing() - def test_process_rss_memory(self): - # give python interpreter some time to properly initialize - # so that the results are the same - time.sleep(0.1) - rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid) - rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 - self.assertEqual(rss_ps, rss_psutil) - - @skip_on_access_denied() - @retry_before_failing() - def test_process_vsz_memory(self): - # give python interpreter some time to properly initialize - # so that the results are the same - time.sleep(0.1) - vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid) - vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 - self.assertEqual(vsz_ps, vsz_psutil) - - def test_process_name(self): - # use command + arg since "comm" keyword not supported on all platforms - name_ps = ps("ps --no-headers -o command -p %s" % ( - self.pid)).split(' ')[0] - # remove path if there is any, from the command - name_ps = os.path.basename(name_ps).lower() - name_psutil = psutil.Process(self.pid).name().lower() - self.assertEqual(name_ps, name_psutil) - - @unittest.skipIf(OSX or BSD, - 'ps -o start not available') - def test_process_create_time(self): - time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0] - time_psutil = psutil.Process(self.pid).create_time() - time_psutil_tstamp = datetime.datetime.fromtimestamp( - time_psutil).strftime("%H:%M:%S") - # sometimes ps shows the time rounded up instead of down, so we check - # for both possible values - round_time_psutil = round(time_psutil) - round_time_psutil_tstamp = datetime.datetime.fromtimestamp( - round_time_psutil).strftime("%H:%M:%S") - self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) - - def test_process_exe(self): - ps_pathname = ps("ps --no-headers -o command -p %s" % - self.pid).split(' ')[0] - psutil_pathname = psutil.Process(self.pid).exe() - try: - self.assertEqual(ps_pathname, psutil_pathname) - except AssertionError: - # certain platforms such as BSD are more accurate returning: - # "/usr/local/bin/python2.7" - # ...instead of: - # "/usr/local/bin/python" - # We do not want to consider this difference in accuracy - # an error. - adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] - self.assertEqual(ps_pathname, adjusted_ps_pathname) - - def test_process_cmdline(self): - ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) - psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) - if SUNOS: - # ps on Solaris only shows the first part of the cmdline - psutil_cmdline = psutil_cmdline.split(" ")[0] - self.assertEqual(ps_cmdline, psutil_cmdline) - - @retry_before_failing() - def test_pids(self): - # Note: this test might fail if the OS is starting/killing - # other processes in the meantime - if SUNOS: - cmd = ["ps", "ax"] - else: - cmd = ["ps", "ax", "-o", "pid"] - p = get_test_subprocess(cmd, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if PY3: - output = str(output, sys.stdout.encoding) - pids_ps = [] - for line in output.split('\n')[1:]: - if line: - pid = int(line.split()[0].strip()) - pids_ps.append(pid) - # remove ps subprocess pid which is supposed to be dead in meantime - pids_ps.remove(p.pid) - pids_psutil = psutil.pids() - pids_ps.sort() - pids_psutil.sort() - - # on OSX ps doesn't show pid 0 - if OSX and 0 not in pids_ps: - pids_ps.insert(0, 0) - - if pids_ps != pids_psutil: - difference = [x for x in pids_psutil if x not in pids_ps] + \ - [x for x in pids_ps if x not in pids_psutil] - self.fail("difference: " + str(difference)) - - # for some reason ifconfig -a does not report all interfaces - # returned by psutil - @unittest.skipIf(SUNOS, "test not reliable on SUNOS") - @unittest.skipIf(TRAVIS, "test not reliable on Travis") - def test_nic_names(self): - p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) - output = p.communicate()[0].strip() - if PY3: - output = str(output, sys.stdout.encoding) - for nic in psutil.net_io_counters(pernic=True).keys(): - for line in output.split(): - if line.startswith(nic): - break - else: - self.fail( - "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( - nic, output)) - - @retry_before_failing() - def test_users(self): - out = sh("who") - lines = out.split('\n') - users = [x.split()[0] for x in lines] - self.assertEqual(len(users), len(psutil.users())) - terminals = [x.split()[1] for x in lines] - for u in psutil.users(): - self.assertTrue(u.name in users, u.name) - self.assertTrue(u.terminal in terminals, u.terminal) - - def test_fds_open(self): - # Note: this fails from time to time; I'm keen on thinking - # it doesn't mean something is broken - def call(p, attr): - args = () - attr = getattr(p, name, None) - if attr is not None and callable(attr): - if name == 'rlimit': - args = (psutil.RLIMIT_NOFILE,) - attr(*args) - else: - attr - - p = psutil.Process(os.getpid()) - failures = [] - ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', - 'send_signal', 'wait', 'children', 'as_dict'] - if LINUX and get_kernel_version() < (2, 6, 36): - ignored_names.append('rlimit') - if LINUX and get_kernel_version() < (2, 6, 23): - ignored_names.append('num_ctx_switches') - for name in dir(psutil.Process): - if (name.startswith('_') or name in ignored_names): - continue - else: - try: - num1 = p.num_fds() - for x in range(2): - call(p, name) - num2 = p.num_fds() - except psutil.AccessDenied: - pass - else: - if abs(num2 - num1) > 1: - fail = "failure while processing Process.%s method " \ - "(before=%s, after=%s)" % (name, num1, num2) - failures.append(fail) - if failures: - self.fail('\n' + '\n'.join(failures)) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/_sunos.py b/python/psutil/test/_sunos.py deleted file mode 100644 index 3d54ccd8c..000000000 --- a/python/psutil/test/_sunos.py +++ /dev/null @@ -1,48 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Sun OS specific tests. These are implicitly run by test_psutil.py.""" - -import sys -import os - -from test_psutil import SUNOS, sh, unittest -import psutil - - -@unittest.skipUnless(SUNOS, "not a SunOS system") -class SunOSSpecificTestCase(unittest.TestCase): - - def test_swap_memory(self): - out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH']) - lines = out.strip().split('\n')[1:] - if not lines: - raise ValueError('no swap device(s) configured') - total = free = 0 - for line in lines: - line = line.split() - t, f = line[-2:] - t = t.replace('K', '') - f = f.replace('K', '') - total += int(int(t) * 1024) - free += int(int(f) * 1024) - used = total - free - - psutil_swap = psutil.swap_memory() - self.assertEqual(psutil_swap.total, total) - self.assertEqual(psutil_swap.used, used) - self.assertEqual(psutil_swap.free, free) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/_windows.py b/python/psutil/test/_windows.py deleted file mode 100644 index b7477bfeb..000000000 --- a/python/psutil/test/_windows.py +++ /dev/null @@ -1,464 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -"""Windows specific tests. These are implicitly run by test_psutil.py.""" - -import errno -import os -import platform -import signal -import subprocess -import sys -import time -import traceback - -from test_psutil import APPVEYOR, WINDOWS -from test_psutil import get_test_subprocess, reap_children, unittest - -import mock -try: - import wmi -except ImportError: - wmi = None -try: - import win32api - import win32con -except ImportError: - win32api = win32con = None - -from psutil._compat import PY3, callable, long -import psutil - - -cext = psutil._psplatform.cext - - -def wrap_exceptions(fun): - def wrapper(self, *args, **kwargs): - try: - return fun(self, *args, **kwargs) - except OSError as err: - from psutil._pswindows import ACCESS_DENIED_SET - if err.errno in ACCESS_DENIED_SET: - raise psutil.AccessDenied(None, None) - if err.errno == errno.ESRCH: - raise psutil.NoSuchProcess(None, None) - raise - return wrapper - - -@unittest.skipUnless(WINDOWS, "not a Windows system") -class WindowsSpecificTestCase(unittest.TestCase): - - @classmethod - def setUpClass(cls): - cls.pid = get_test_subprocess().pid - - @classmethod - def tearDownClass(cls): - reap_children() - - def test_issue_24(self): - p = psutil.Process(0) - self.assertRaises(psutil.AccessDenied, p.kill) - - def test_special_pid(self): - p = psutil.Process(4) - self.assertEqual(p.name(), 'System') - # use __str__ to access all common Process properties to check - # that nothing strange happens - str(p) - p.username() - self.assertTrue(p.create_time() >= 0.0) - try: - rss, vms = p.memory_info() - except psutil.AccessDenied: - # expected on Windows Vista and Windows 7 - if not platform.uname()[1] in ('vista', 'win-7', 'win7'): - raise - else: - self.assertTrue(rss > 0) - - def test_send_signal(self): - p = psutil.Process(self.pid) - self.assertRaises(ValueError, p.send_signal, signal.SIGINT) - - def test_nic_names(self): - p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE) - out = p.communicate()[0] - if PY3: - out = str(out, sys.stdout.encoding) - nics = psutil.net_io_counters(pernic=True).keys() - for nic in nics: - if "pseudo-interface" in nic.replace(' ', '-').lower(): - continue - if nic not in out: - self.fail( - "%r nic wasn't found in 'ipconfig /all' output" % nic) - - def test_exe(self): - for p in psutil.process_iter(): - try: - self.assertEqual(os.path.basename(p.exe()), p.name()) - except psutil.Error: - pass - - # --- Process class tests - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_name(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(p.name(), w.Caption) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_exe(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - # Note: wmi reports the exe as a lower case string. - # Being Windows paths case-insensitive we ignore that. - self.assertEqual(p.exe().lower(), w.ExecutablePath.lower()) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_cmdline(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - self.assertEqual(' '.join(p.cmdline()), - w.CommandLine.replace('"', '')) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_username(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - domain, _, username = w.GetOwner() - username = "%s\\%s" % (domain, username) - self.assertEqual(p.username(), username) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_rss_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - rss = p.memory_info().rss - self.assertEqual(rss, int(w.WorkingSetSize)) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_vms_memory(self): - time.sleep(0.1) - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - vms = p.memory_info().vms - # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx - # ...claims that PageFileUsage is represented in Kilo - # bytes but funnily enough on certain platforms bytes are - # returned instead. - wmi_usage = int(w.PageFileUsage) - if (vms != wmi_usage) and (vms != wmi_usage * 1024): - self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms)) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_process_create_time(self): - w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - p = psutil.Process(self.pid) - wmic_create = str(w.CreationDate.split('.')[0]) - psutil_create = time.strftime("%Y%m%d%H%M%S", - time.localtime(p.create_time())) - self.assertEqual(wmic_create, psutil_create) - - # --- psutil namespace functions and constants tests - - @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ, - 'NUMBER_OF_PROCESSORS env var is not available') - def test_cpu_count(self): - num_cpus = int(os.environ['NUMBER_OF_PROCESSORS']) - self.assertEqual(num_cpus, psutil.cpu_count()) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_total_phymem(self): - w = wmi.WMI().Win32_ComputerSystem()[0] - self.assertEqual(int(w.TotalPhysicalMemory), - psutil.virtual_memory().total) - - # @unittest.skipIf(wmi is None, "wmi module is not installed") - # def test__UPTIME(self): - # # _UPTIME constant is not public but it is used internally - # # as value to return for pid 0 creation time. - # # WMI behaves the same. - # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0] - # p = psutil.Process(0) - # wmic_create = str(w.CreationDate.split('.')[0]) - # psutil_create = time.strftime("%Y%m%d%H%M%S", - # time.localtime(p.create_time())) - # - - # Note: this test is not very reliable - @unittest.skipIf(wmi is None, "wmi module is not installed") - @unittest.skipIf(APPVEYOR, "test not relieable on appveyor") - def test_pids(self): - # Note: this test might fail if the OS is starting/killing - # other processes in the meantime - w = wmi.WMI().Win32_Process() - wmi_pids = set([x.ProcessId for x in w]) - psutil_pids = set(psutil.pids()) - self.assertEqual(wmi_pids, psutil_pids) - - @unittest.skipIf(wmi is None, "wmi module is not installed") - def test_disks(self): - ps_parts = psutil.disk_partitions(all=True) - wmi_parts = wmi.WMI().Win32_LogicalDisk() - for ps_part in ps_parts: - for wmi_part in wmi_parts: - if ps_part.device.replace('\\', '') == wmi_part.DeviceID: - if not ps_part.mountpoint: - # this is usually a CD-ROM with no disk inserted - break - try: - usage = psutil.disk_usage(ps_part.mountpoint) - except OSError as err: - if err.errno == errno.ENOENT: - # usually this is the floppy - break - else: - raise - self.assertEqual(usage.total, int(wmi_part.Size)) - wmi_free = int(wmi_part.FreeSpace) - self.assertEqual(usage.free, wmi_free) - # 10 MB tollerance - if abs(usage.free - wmi_free) > 10 * 1024 * 1024: - self.fail("psutil=%s, wmi=%s" % ( - usage.free, wmi_free)) - break - else: - self.fail("can't find partition %s" % repr(ps_part)) - - @unittest.skipIf(win32api is None, "pywin32 module is not installed") - def test_num_handles(self): - p = psutil.Process(os.getpid()) - before = p.num_handles() - handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION, - win32con.FALSE, os.getpid()) - after = p.num_handles() - self.assertEqual(after, before + 1) - win32api.CloseHandle(handle) - self.assertEqual(p.num_handles(), before) - - @unittest.skipIf(win32api is None, "pywin32 module is not installed") - def test_num_handles_2(self): - # Note: this fails from time to time; I'm keen on thinking - # it doesn't mean something is broken - def call(p, attr): - attr = getattr(p, name, None) - if attr is not None and callable(attr): - attr() - else: - attr - - p = psutil.Process(self.pid) - failures = [] - for name in dir(psutil.Process): - if name.startswith('_') \ - or name in ('terminate', 'kill', 'suspend', 'resume', - 'nice', 'send_signal', 'wait', 'children', - 'as_dict'): - continue - else: - try: - call(p, name) - num1 = p.num_handles() - call(p, name) - num2 = p.num_handles() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - else: - if num2 > num1: - fail = \ - "failure while processing Process.%s method " \ - "(before=%s, after=%s)" % (name, num1, num2) - failures.append(fail) - if failures: - self.fail('\n' + '\n'.join(failures)) - - def test_name_always_available(self): - # On Windows name() is never supposed to raise AccessDenied, - # see https://github.com/giampaolo/psutil/issues/627 - for p in psutil.process_iter(): - try: - p.name() - except psutil.NoSuchProcess(): - pass - - -@unittest.skipUnless(WINDOWS, "not a Windows system") -class TestDualProcessImplementation(unittest.TestCase): - """ - Certain APIs on Windows have 2 internal implementations, one - based on documented Windows APIs, another one based - NtQuerySystemInformation() which gets called as fallback in - case the first fails because of limited permission error. - Here we test that the two methods return the exact same value, - see: - https://github.com/giampaolo/psutil/issues/304 - """ - - fun_names = [ - # function name, tolerance - ('proc_cpu_times', 0.2), - ('proc_create_time', 0.5), - ('proc_num_handles', 1), # 1 because impl #1 opens a handle - ('proc_memory_info', 1024), # KB - ('proc_io_counters', 0), - ] - - def test_compare_values(self): - def assert_ge_0(obj): - if isinstance(obj, tuple): - for value in obj: - self.assertGreaterEqual(value, 0, msg=obj) - elif isinstance(obj, (int, long, float)): - self.assertGreaterEqual(obj, 0) - else: - assert 0 # case not handled which needs to be fixed - - def compare_with_tolerance(ret1, ret2, tolerance): - if ret1 == ret2: - return - else: - if isinstance(ret2, (int, long, float)): - diff = abs(ret1 - ret2) - self.assertLessEqual(diff, tolerance) - elif isinstance(ret2, tuple): - for a, b in zip(ret1, ret2): - diff = abs(a - b) - self.assertLessEqual(diff, tolerance) - - from psutil._pswindows import ntpinfo - failures = [] - for p in psutil.process_iter(): - try: - nt = ntpinfo(*cext.proc_info(p.pid)) - except psutil.NoSuchProcess: - continue - assert_ge_0(nt) - - for name, tolerance in self.fun_names: - if name == 'proc_memory_info' and p.pid == os.getpid(): - continue - if name == 'proc_create_time' and p.pid in (0, 4): - continue - meth = wrap_exceptions(getattr(cext, name)) - try: - ret = meth(p.pid) - except (psutil.NoSuchProcess, psutil.AccessDenied): - continue - # compare values - try: - if name == 'proc_cpu_times': - compare_with_tolerance(ret[0], nt.user_time, tolerance) - compare_with_tolerance(ret[1], - nt.kernel_time, tolerance) - elif name == 'proc_create_time': - compare_with_tolerance(ret, nt.create_time, tolerance) - elif name == 'proc_num_handles': - compare_with_tolerance(ret, nt.num_handles, tolerance) - elif name == 'proc_io_counters': - compare_with_tolerance(ret[0], nt.io_rcount, tolerance) - compare_with_tolerance(ret[1], nt.io_wcount, tolerance) - compare_with_tolerance(ret[2], nt.io_rbytes, tolerance) - compare_with_tolerance(ret[3], nt.io_wbytes, tolerance) - elif name == 'proc_memory_info': - try: - rawtupl = cext.proc_memory_info_2(p.pid) - except psutil.NoSuchProcess: - continue - compare_with_tolerance(ret, rawtupl, tolerance) - except AssertionError: - trace = traceback.format_exc() - msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % ( - trace, p.pid, name, ret, nt) - failures.append(msg) - break - - if failures: - self.fail('\n\n'.join(failures)) - - # --- - # same tests as above but mimicks the AccessDenied failure of - # the first (fast) method failing with AD. - # TODO: currently does not take tolerance into account. - - def test_name(self): - name = psutil.Process().name() - with mock.patch("psutil._psplatform.cext.proc_exe", - side_effect=psutil.AccessDenied(os.getpid())) as fun: - psutil.Process().name() == name - assert fun.called - - def test_memory_info(self): - mem = psutil.Process().memory_info() - with mock.patch("psutil._psplatform.cext.proc_memory_info", - side_effect=OSError(errno.EPERM, "msg")) as fun: - psutil.Process().memory_info() == mem - assert fun.called - - def test_create_time(self): - ctime = psutil.Process().create_time() - with mock.patch("psutil._psplatform.cext.proc_create_time", - side_effect=OSError(errno.EPERM, "msg")) as fun: - psutil.Process().create_time() == ctime - assert fun.called - - def test_cpu_times(self): - cpu_times = psutil.Process().cpu_times() - with mock.patch("psutil._psplatform.cext.proc_cpu_times", - side_effect=OSError(errno.EPERM, "msg")) as fun: - psutil.Process().cpu_times() == cpu_times - assert fun.called - - def test_io_counters(self): - io_counters = psutil.Process().io_counters() - with mock.patch("psutil._psplatform.cext.proc_io_counters", - side_effect=OSError(errno.EPERM, "msg")) as fun: - psutil.Process().io_counters() == io_counters - assert fun.called - - def test_num_handles(self): - io_counters = psutil.Process().io_counters() - with mock.patch("psutil._psplatform.cext.proc_io_counters", - side_effect=OSError(errno.EPERM, "msg")) as fun: - psutil.Process().io_counters() == io_counters - assert fun.called - - # --- other tests - - def test_compare_name_exe(self): - for p in psutil.process_iter(): - try: - a = os.path.basename(p.exe()) - b = p.name() - except (psutil.NoSuchProcess, psutil.AccessDenied): - pass - else: - self.assertEqual(a, b) - - def test_zombies(self): - # test that NPS is raised by the 2nd implementation in case a - # process no longer exists - ZOMBIE_PID = max(psutil.pids()) + 5000 - for name, _ in self.fun_names: - meth = wrap_exceptions(getattr(cext, name)) - self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID) - - -def main(): - test_suite = unittest.TestSuite() - test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase)) - test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/test_memory_leaks.py b/python/psutil/test/test_memory_leaks.py deleted file mode 100644 index 6f02dc0ac..000000000 --- a/python/psutil/test/test_memory_leaks.py +++ /dev/null @@ -1,445 +0,0 @@ -#!/usr/bin/env python - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -""" -A test script which attempts to detect memory leaks by calling C -functions many times and compare process memory usage before and -after the calls. It might produce false positives. -""" - -import functools -import gc -import os -import socket -import sys -import threading -import time - -import psutil -import psutil._common - -from psutil._compat import xrange, callable -from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN, - RLIMIT_SUPPORT, TRAVIS) -from test_psutil import (reap_children, supports_ipv6, safe_remove, - get_test_subprocess) - -if sys.version_info < (2, 7): - import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 -else: - import unittest - - -LOOPS = 1000 -TOLERANCE = 4096 -SKIP_PYTHON_IMPL = True - - -def skip_if_linux(): - return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL, - "not worth being tested on LINUX (pure python)") - - -class Base(unittest.TestCase): - proc = psutil.Process() - - def execute(self, function, *args, **kwargs): - def call_many_times(): - for x in xrange(LOOPS - 1): - self.call(function, *args, **kwargs) - del x - gc.collect() - return self.get_mem() - - self.call(function, *args, **kwargs) - self.assertEqual(gc.garbage, []) - self.assertEqual(threading.active_count(), 1) - - # RSS comparison - # step 1 - rss1 = call_many_times() - # step 2 - rss2 = call_many_times() - - difference = rss2 - rss1 - if difference > TOLERANCE: - # This doesn't necessarily mean we have a leak yet. - # At this point we assume that after having called the - # function so many times the memory usage is stabilized - # and if there are no leaks it should not increase any - # more. - # Let's keep calling fun for 3 more seconds and fail if - # we notice any difference. - stop_at = time.time() + 3 - while True: - self.call(function, *args, **kwargs) - if time.time() >= stop_at: - break - del stop_at - gc.collect() - rss3 = self.get_mem() - difference = rss3 - rss2 - if rss3 > rss2: - self.fail("rss2=%s, rss3=%s, difference=%s" - % (rss2, rss3, difference)) - - def execute_w_exc(self, exc, function, *args, **kwargs): - kwargs['_exc'] = exc - self.execute(function, *args, **kwargs) - - def get_mem(self): - return psutil.Process().memory_info()[0] - - def call(self, function, *args, **kwargs): - raise NotImplementedError("must be implemented in subclass") - - -class TestProcessObjectLeaks(Base): - """Test leaks of Process class methods and properties""" - - def setUp(self): - gc.collect() - - def tearDown(self): - reap_children() - - def call(self, function, *args, **kwargs): - if callable(function): - if '_exc' in kwargs: - exc = kwargs.pop('_exc') - self.assertRaises(exc, function, *args, **kwargs) - else: - try: - function(*args, **kwargs) - except psutil.Error: - pass - else: - meth = getattr(self.proc, function) - if '_exc' in kwargs: - exc = kwargs.pop('_exc') - self.assertRaises(exc, meth, *args, **kwargs) - else: - try: - meth(*args, **kwargs) - except psutil.Error: - pass - - @skip_if_linux() - def test_name(self): - self.execute('name') - - @skip_if_linux() - def test_cmdline(self): - self.execute('cmdline') - - @skip_if_linux() - def test_exe(self): - self.execute('exe') - - @skip_if_linux() - def test_ppid(self): - self.execute('ppid') - - @unittest.skipUnless(POSIX, "POSIX only") - @skip_if_linux() - def test_uids(self): - self.execute('uids') - - @unittest.skipUnless(POSIX, "POSIX only") - @skip_if_linux() - def test_gids(self): - self.execute('gids') - - @skip_if_linux() - def test_status(self): - self.execute('status') - - def test_nice_get(self): - self.execute('nice') - - def test_nice_set(self): - niceness = psutil.Process().nice() - self.execute('nice', niceness) - - @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), - "Linux and Windows Vista only") - def test_ionice_get(self): - self.execute('ionice') - - @unittest.skipUnless(hasattr(psutil.Process, 'ionice'), - "Linux and Windows Vista only") - def test_ionice_set(self): - if WINDOWS: - value = psutil.Process().ionice() - self.execute('ionice', value) - else: - from psutil._pslinux import cext - self.execute('ionice', psutil.IOPRIO_CLASS_NONE) - fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0) - self.execute_w_exc(OSError, fun) - - @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform") - @skip_if_linux() - def test_io_counters(self): - self.execute('io_counters') - - @unittest.skipUnless(WINDOWS, "not worth being tested on posix") - def test_username(self): - self.execute('username') - - @skip_if_linux() - def test_create_time(self): - self.execute('create_time') - - @skip_if_linux() - def test_num_threads(self): - self.execute('num_threads') - - @unittest.skipUnless(WINDOWS, "Windows only") - def test_num_handles(self): - self.execute('num_handles') - - @unittest.skipUnless(POSIX, "POSIX only") - @skip_if_linux() - def test_num_fds(self): - self.execute('num_fds') - - @skip_if_linux() - def test_threads(self): - self.execute('threads') - - @skip_if_linux() - def test_cpu_times(self): - self.execute('cpu_times') - - @skip_if_linux() - def test_memory_info(self): - self.execute('memory_info') - - @skip_if_linux() - def test_memory_info_ex(self): - self.execute('memory_info_ex') - - @unittest.skipUnless(POSIX, "POSIX only") - @skip_if_linux() - def test_terminal(self): - self.execute('terminal') - - @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, - "not worth being tested on POSIX (pure python)") - def test_resume(self): - self.execute('resume') - - @skip_if_linux() - def test_cwd(self): - self.execute('cwd') - - @unittest.skipUnless(WINDOWS or LINUX or BSD, - "Windows or Linux or BSD only") - def test_cpu_affinity_get(self): - self.execute('cpu_affinity') - - @unittest.skipUnless(WINDOWS or LINUX or BSD, - "Windows or Linux or BSD only") - def test_cpu_affinity_set(self): - affinity = psutil.Process().cpu_affinity() - self.execute('cpu_affinity', affinity) - if not TRAVIS: - self.execute_w_exc(ValueError, 'cpu_affinity', [-1]) - - @skip_if_linux() - def test_open_files(self): - safe_remove(TESTFN) # needed after UNIX socket test has run - with open(TESTFN, 'w'): - self.execute('open_files') - - # OSX implementation is unbelievably slow - @unittest.skipIf(OSX, "OSX implementation is too slow") - @skip_if_linux() - def test_memory_maps(self): - self.execute('memory_maps') - - @unittest.skipUnless(LINUX, "Linux only") - @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, - "only available on Linux >= 2.6.36") - def test_rlimit_get(self): - self.execute('rlimit', psutil.RLIMIT_NOFILE) - - @unittest.skipUnless(LINUX, "Linux only") - @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, - "only available on Linux >= 2.6.36") - def test_rlimit_set(self): - limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE) - self.execute('rlimit', psutil.RLIMIT_NOFILE, limit) - self.execute_w_exc(OSError, 'rlimit', -1) - - @skip_if_linux() - # Windows implementation is based on a single system-wide function - @unittest.skipIf(WINDOWS, "tested later") - def test_connections(self): - def create_socket(family, type): - sock = socket.socket(family, type) - sock.bind(('', 0)) - if type == socket.SOCK_STREAM: - sock.listen(1) - return sock - - socks = [] - socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM)) - socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM)) - if supports_ipv6(): - socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM)) - socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM)) - if hasattr(socket, 'AF_UNIX'): - safe_remove(TESTFN) - s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - s.bind(TESTFN) - s.listen(1) - socks.append(s) - kind = 'all' - # TODO: UNIX sockets are temporarily implemented by parsing - # 'pfiles' cmd output; we don't want that part of the code to - # be executed. - if SUNOS: - kind = 'inet' - try: - self.execute('connections', kind=kind) - finally: - for s in socks: - s.close() - - -p = get_test_subprocess() -DEAD_PROC = psutil.Process(p.pid) -DEAD_PROC.kill() -DEAD_PROC.wait() -del p - - -class TestProcessObjectLeaksZombie(TestProcessObjectLeaks): - """Same as above but looks for leaks occurring when dealing with - zombie processes raising NoSuchProcess exception. - """ - proc = DEAD_PROC - - def call(self, *args, **kwargs): - try: - TestProcessObjectLeaks.call(self, *args, **kwargs) - except psutil.NoSuchProcess: - pass - - if not POSIX: - def test_kill(self): - self.execute('kill') - - def test_terminate(self): - self.execute('terminate') - - def test_suspend(self): - self.execute('suspend') - - def test_resume(self): - self.execute('resume') - - def test_wait(self): - self.execute('wait') - - -class TestModuleFunctionsLeaks(Base): - """Test leaks of psutil module functions.""" - - def setUp(self): - gc.collect() - - def call(self, function, *args, **kwargs): - fun = getattr(psutil, function) - fun(*args, **kwargs) - - @skip_if_linux() - def test_cpu_count_logical(self): - psutil.cpu_count = psutil._psplatform.cpu_count_logical - self.execute('cpu_count') - - @skip_if_linux() - def test_cpu_count_physical(self): - psutil.cpu_count = psutil._psplatform.cpu_count_physical - self.execute('cpu_count') - - @skip_if_linux() - def test_boot_time(self): - self.execute('boot_time') - - @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, - "not worth being tested on POSIX (pure python)") - def test_pid_exists(self): - self.execute('pid_exists', os.getpid()) - - def test_virtual_memory(self): - self.execute('virtual_memory') - - # TODO: remove this skip when this gets fixed - @unittest.skipIf(SUNOS, - "not worth being tested on SUNOS (uses a subprocess)") - def test_swap_memory(self): - self.execute('swap_memory') - - @skip_if_linux() - def test_cpu_times(self): - self.execute('cpu_times') - - @skip_if_linux() - def test_per_cpu_times(self): - self.execute('cpu_times', percpu=True) - - @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL, - "not worth being tested on POSIX (pure python)") - def test_disk_usage(self): - self.execute('disk_usage', '.') - - def test_disk_partitions(self): - self.execute('disk_partitions') - - @skip_if_linux() - def test_net_io_counters(self): - self.execute('net_io_counters') - - @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), - '/proc/diskstats not available on this Linux version') - @skip_if_linux() - def test_disk_io_counters(self): - self.execute('disk_io_counters') - - # XXX - on Windows this produces a false positive - @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows") - def test_users(self): - self.execute('users') - - @unittest.skipIf(LINUX, - "not worth being tested on Linux (pure python)") - def test_net_connections(self): - self.execute('net_connections') - - def test_net_if_addrs(self): - self.execute('net_if_addrs') - - @unittest.skipIf(TRAVIS, "EPERM on travis") - def test_net_if_stats(self): - self.execute('net_if_stats') - - -def main(): - test_suite = unittest.TestSuite() - tests = [TestProcessObjectLeaksZombie, - TestProcessObjectLeaks, - TestModuleFunctionsLeaks] - for test in tests: - test_suite.addTest(unittest.makeSuite(test)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) diff --git a/python/psutil/test/test_psutil.py b/python/psutil/test/test_psutil.py deleted file mode 100644 index 3b2e3587a..000000000 --- a/python/psutil/test/test_psutil.py +++ /dev/null @@ -1,3013 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. -# Use of this source code is governed by a BSD-style license that can be -# found in the LICENSE file. - -""" -psutil test suite. Run it with: -$ make test - -If you're on Python < 2.7 unittest2 module must be installed first: -https://pypi.python.org/pypi/unittest2 -""" - -from __future__ import division - -import ast -import atexit -import collections -import contextlib -import datetime -import errno -import functools -import imp -import json -import os -import pickle -import pprint -import re -import select -import shutil -import signal -import socket -import stat -import subprocess -import sys -import tempfile -import textwrap -import threading -import time -import traceback -import types -import warnings -from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM -try: - import ipaddress # python >= 3.3 -except ImportError: - ipaddress = None -try: - from unittest import mock # py3 -except ImportError: - import mock # requires "pip install mock" - -import psutil -from psutil._compat import PY3, callable, long, unicode - -if sys.version_info < (2, 7): - import unittest2 as unittest # https://pypi.python.org/pypi/unittest2 -else: - import unittest -if sys.version_info >= (3, 4): - import enum -else: - enum = None - - -# =================================================================== -# --- Constants -# =================================================================== - -# conf for retry_before_failing() decorator -NO_RETRIES = 10 -# bytes tolerance for OS memory related tests -TOLERANCE = 500 * 1024 # 500KB -# the timeout used in functions which have to wait -GLOBAL_TIMEOUT = 3 - -AF_INET6 = getattr(socket, "AF_INET6") -AF_UNIX = getattr(socket, "AF_UNIX", None) -PYTHON = os.path.realpath(sys.executable) -DEVNULL = open(os.devnull, 'r+') -TESTFN = os.path.join(os.getcwd(), "$testfile") -TESTFN_UNICODE = TESTFN + "Æ’Å‘Å‘" -TESTFILE_PREFIX = 'psutil-test-suite-' -if not PY3: - try: - TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding()) - except UnicodeDecodeError: - TESTFN_UNICODE = TESTFN + "???" - -EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), - '..', 'examples')) - -POSIX = os.name == 'posix' -WINDOWS = os.name == 'nt' -if WINDOWS: - WIN_VISTA = (6, 0, 0) -LINUX = sys.platform.startswith("linux") -OSX = sys.platform.startswith("darwin") -BSD = sys.platform.startswith("freebsd") -SUNOS = sys.platform.startswith("sunos") -VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil) - if x.startswith('STATUS_')] -# whether we're running this test suite on Travis (https://travis-ci.org/) -TRAVIS = bool(os.environ.get('TRAVIS')) -# whether we're running this test suite on Appveyor for Windows -# (http://www.appveyor.com/) -APPVEYOR = bool(os.environ.get('APPVEYOR')) - -if TRAVIS or 'tox' in sys.argv[0]: - import ipaddress -if TRAVIS or APPVEYOR: - GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4 - - -# =================================================================== -# --- Utility functions -# =================================================================== - -def cleanup(): - reap_children(search_all=True) - safe_remove(TESTFN) - try: - safe_rmdir(TESTFN_UNICODE) - except UnicodeEncodeError: - pass - for path in _testfiles: - safe_remove(path) - -atexit.register(cleanup) -atexit.register(lambda: DEVNULL.close()) - - -_subprocesses_started = set() - - -def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL, - stdin=DEVNULL, wait=False): - """Return a subprocess.Popen object to use in tests. - By default stdout and stderr are redirected to /dev/null and the - python interpreter is used as test process. - If 'wait' is True attemps to make sure the process is in a - reasonably initialized state. - """ - if cmd is None: - pyline = "" - if wait: - pyline += "open(r'%s', 'w'); " % TESTFN - pyline += "import time; time.sleep(60);" - cmd_ = [PYTHON, "-c", pyline] - else: - cmd_ = cmd - sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin) - if wait: - if cmd is None: - stop_at = time.time() + 3 - while stop_at > time.time(): - if os.path.exists(TESTFN): - break - time.sleep(0.001) - else: - warn("couldn't make sure test file was actually created") - else: - wait_for_pid(sproc.pid) - _subprocesses_started.add(psutil.Process(sproc.pid)) - return sproc - - -_testfiles = [] - - -def pyrun(src): - """Run python code 'src' in a separate interpreter. - Return interpreter subprocess. - """ - if PY3: - src = bytes(src, 'ascii') - with tempfile.NamedTemporaryFile( - prefix=TESTFILE_PREFIX, delete=False) as f: - _testfiles.append(f.name) - f.write(src) - f.flush() - subp = get_test_subprocess([PYTHON, f.name], stdout=None, - stderr=None) - wait_for_pid(subp.pid) - return subp - - -def warn(msg): - """Raise a warning msg.""" - warnings.warn(msg, UserWarning) - - -def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE): - """run cmd in a subprocess and return its output. - raises RuntimeError on error. - """ - p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr) - stdout, stderr = p.communicate() - if p.returncode != 0: - raise RuntimeError(stderr) - if stderr: - warn(stderr) - if PY3: - stdout = str(stdout, sys.stdout.encoding) - return stdout.strip() - - -def which(program): - """Same as UNIX which command. Return None on command not found.""" - def is_exe(fpath): - return os.path.isfile(fpath) and os.access(fpath, os.X_OK) - - fpath, fname = os.path.split(program) - if fpath: - if is_exe(program): - return program - else: - for path in os.environ["PATH"].split(os.pathsep): - exe_file = os.path.join(path, program) - if is_exe(exe_file): - return exe_file - return None - - -if POSIX: - def get_kernel_version(): - """Return a tuple such as (2, 6, 36).""" - s = "" - uname = os.uname()[2] - for c in uname: - if c.isdigit() or c == '.': - s += c - else: - break - if not s: - raise ValueError("can't parse %r" % uname) - minor = 0 - micro = 0 - nums = s.split('.') - major = int(nums[0]) - if len(nums) >= 2: - minor = int(nums[1]) - if len(nums) >= 3: - micro = int(nums[2]) - return (major, minor, micro) - - -if LINUX: - RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36) -else: - RLIMIT_SUPPORT = False - - -def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT): - """Wait for pid to show up in the process list then return. - Used in the test suite to give time the sub process to initialize. - """ - raise_at = time.time() + timeout - while True: - if pid in psutil.pids(): - # give it one more iteration to allow full initialization - time.sleep(0.01) - return - time.sleep(0.0001) - if time.time() >= raise_at: - raise RuntimeError("Timed out") - - -def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True): - """Wait for a file to be written on disk.""" - stop_at = time.time() + 3 - while time.time() < stop_at: - try: - with open(fname, "r") as f: - data = f.read() - if not data: - continue - if delete_file: - os.remove(fname) - return data - except IOError: - time.sleep(0.001) - raise RuntimeError("timed out (couldn't read file)") - - -def reap_children(search_all=False): - """Kill any subprocess started by this test suite and ensure that - no zombies stick around to hog resources and create problems when - looking for refleaks. - """ - global _subprocesses_started - procs = _subprocesses_started.copy() - if search_all: - this_process = psutil.Process() - for p in this_process.children(recursive=True): - procs.add(p) - for p in procs: - try: - p.terminate() - except psutil.NoSuchProcess: - pass - gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT) - for p in alive: - warn("couldn't terminate process %s" % p) - try: - p.kill() - except psutil.NoSuchProcess: - pass - _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT) - if alive: - warn("couldn't not kill processes %s" % str(alive)) - _subprocesses_started = set(alive) - - -def check_ip_address(addr, family): - """Attempts to check IP address's validity.""" - if enum and PY3: - assert isinstance(family, enum.IntEnum), family - if family == AF_INET: - octs = [int(x) for x in addr.split('.')] - assert len(octs) == 4, addr - for num in octs: - assert 0 <= num <= 255, addr - if ipaddress: - if not PY3: - addr = unicode(addr) - ipaddress.IPv4Address(addr) - elif family == AF_INET6: - assert isinstance(addr, str), addr - if ipaddress: - if not PY3: - addr = unicode(addr) - ipaddress.IPv6Address(addr) - elif family == psutil.AF_LINK: - assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr - else: - raise ValueError("unknown family %r", family) - - -def check_connection_ntuple(conn): - """Check validity of a connection namedtuple.""" - valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if - x.startswith('CONN_')] - assert conn[0] == conn.fd - assert conn[1] == conn.family - assert conn[2] == conn.type - assert conn[3] == conn.laddr - assert conn[4] == conn.raddr - assert conn[5] == conn.status - assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type) - assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family) - assert conn.status in valid_conn_states, conn.status - - # check IP address and port sanity - for addr in (conn.laddr, conn.raddr): - if not addr: - continue - if conn.family in (AF_INET, AF_INET6): - assert isinstance(addr, tuple), addr - ip, port = addr - assert isinstance(port, int), port - assert 0 <= port <= 65535, port - check_ip_address(ip, conn.family) - elif conn.family == AF_UNIX: - assert isinstance(addr, (str, None)), addr - else: - raise ValueError("unknown family %r", conn.family) - - if conn.family in (AF_INET, AF_INET6): - # actually try to bind the local socket; ignore IPv6 - # sockets as their address might be represented as - # an IPv4-mapped-address (e.g. "::127.0.0.1") - # and that's rejected by bind() - if conn.family == AF_INET: - s = socket.socket(conn.family, conn.type) - with contextlib.closing(s): - try: - s.bind((conn.laddr[0], 0)) - except socket.error as err: - if err.errno != errno.EADDRNOTAVAIL: - raise - elif conn.family == AF_UNIX: - assert not conn.raddr, repr(conn.raddr) - assert conn.status == psutil.CONN_NONE, conn.status - - if getattr(conn, 'fd', -1) != -1: - assert conn.fd > 0, conn - if hasattr(socket, 'fromfd') and not WINDOWS: - try: - dupsock = socket.fromfd(conn.fd, conn.family, conn.type) - except (socket.error, OSError) as err: - if err.args[0] != errno.EBADF: - raise - else: - with contextlib.closing(dupsock): - assert dupsock.family == conn.family - assert dupsock.type == conn.type - - -def safe_remove(file): - "Convenience function for removing temporary test files" - try: - os.remove(file) - except OSError as err: - if err.errno != errno.ENOENT: - # file is being used by another process - if WINDOWS and isinstance(err, WindowsError) and err.errno == 13: - return - raise - - -def safe_rmdir(dir): - "Convenience function for removing temporary test directories" - try: - os.rmdir(dir) - except OSError as err: - if err.errno != errno.ENOENT: - raise - - -def call_until(fun, expr, timeout=GLOBAL_TIMEOUT): - """Keep calling function for timeout secs and exit if eval() - expression is True. - """ - stop_at = time.time() + timeout - while time.time() < stop_at: - ret = fun() - if eval(expr): - return ret - time.sleep(0.001) - raise RuntimeError('timed out (ret=%r)' % ret) - - -def retry_before_failing(ntimes=None): - """Decorator which runs a test function and retries N times before - actually failing. - """ - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - for x in range(ntimes or NO_RETRIES): - try: - return fun(*args, **kwargs) - except AssertionError: - pass - raise - return wrapper - return decorator - - -def skip_on_access_denied(only_if=None): - """Decorator to Ignore AccessDenied exceptions.""" - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - try: - return fun(*args, **kwargs) - except psutil.AccessDenied: - if only_if is not None: - if not only_if: - raise - msg = "%r was skipped because it raised AccessDenied" \ - % fun.__name__ - raise unittest.SkipTest(msg) - return wrapper - return decorator - - -def skip_on_not_implemented(only_if=None): - """Decorator to Ignore NotImplementedError exceptions.""" - def decorator(fun): - @functools.wraps(fun) - def wrapper(*args, **kwargs): - try: - return fun(*args, **kwargs) - except NotImplementedError: - if only_if is not None: - if not only_if: - raise - msg = "%r was skipped because it raised NotImplementedError" \ - % fun.__name__ - raise unittest.SkipTest(msg) - return wrapper - return decorator - - -def supports_ipv6(): - """Return True if IPv6 is supported on this platform.""" - if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"): - return False - sock = None - try: - sock = socket.socket(AF_INET6, SOCK_STREAM) - sock.bind(("::1", 0)) - except (socket.error, socket.gaierror): - return False - else: - return True - finally: - if sock is not None: - sock.close() - - -if WINDOWS: - def get_winver(): - wv = sys.getwindowsversion() - if hasattr(wv, 'service_pack_major'): # python >= 2.7 - sp = wv.service_pack_major or 0 - else: - r = re.search("\s\d$", wv[4]) - if r: - sp = int(r.group(0)) - else: - sp = 0 - return (wv[0], wv[1], sp) - - -class ThreadTask(threading.Thread): - """A thread object used for running process thread tests.""" - - def __init__(self): - threading.Thread.__init__(self) - self._running = False - self._interval = None - self._flag = threading.Event() - - def __repr__(self): - name = self.__class__.__name__ - return '<%s running=%s at %#x>' % (name, self._running, id(self)) - - def start(self, interval=0.001): - """Start thread and keep it running until an explicit - stop() request. Polls for shutdown every 'timeout' seconds. - """ - if self._running: - raise ValueError("already started") - self._interval = interval - threading.Thread.start(self) - self._flag.wait() - - def run(self): - self._running = True - self._flag.set() - while self._running: - time.sleep(self._interval) - - def stop(self): - """Stop thread execution and and waits until it is stopped.""" - if not self._running: - raise ValueError("already stopped") - self._running = False - self.join() - - -# =================================================================== -# --- System-related API tests -# =================================================================== - -class TestSystemAPIs(unittest.TestCase): - """Tests for system-related APIs.""" - - def setUp(self): - safe_remove(TESTFN) - - def tearDown(self): - reap_children() - - def test_process_iter(self): - self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()]) - sproc = get_test_subprocess() - self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()]) - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()]) - - def test_wait_procs(self): - def callback(p): - l.append(p.pid) - - l = [] - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() - procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] - self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1) - self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1) - t = time.time() - gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback) - - self.assertLess(time.time() - t, 0.5) - self.assertEqual(gone, []) - self.assertEqual(len(alive), 3) - self.assertEqual(l, []) - for p in alive: - self.assertFalse(hasattr(p, 'returncode')) - - @retry_before_failing(30) - def test(procs, callback): - gone, alive = psutil.wait_procs(procs, timeout=0.03, - callback=callback) - self.assertEqual(len(gone), 1) - self.assertEqual(len(alive), 2) - return gone, alive - - sproc3.terminate() - gone, alive = test(procs, callback) - self.assertIn(sproc3.pid, [x.pid for x in gone]) - if POSIX: - self.assertEqual(gone.pop().returncode, signal.SIGTERM) - else: - self.assertEqual(gone.pop().returncode, 1) - self.assertEqual(l, [sproc3.pid]) - for p in alive: - self.assertFalse(hasattr(p, 'returncode')) - - @retry_before_failing(30) - def test(procs, callback): - gone, alive = psutil.wait_procs(procs, timeout=0.03, - callback=callback) - self.assertEqual(len(gone), 3) - self.assertEqual(len(alive), 0) - return gone, alive - - sproc1.terminate() - sproc2.terminate() - gone, alive = test(procs, callback) - self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid])) - for p in gone: - self.assertTrue(hasattr(p, 'returncode')) - - def test_wait_procs_no_timeout(self): - sproc1 = get_test_subprocess() - sproc2 = get_test_subprocess() - sproc3 = get_test_subprocess() - procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)] - for p in procs: - p.terminate() - gone, alive = psutil.wait_procs(procs) - - def test_boot_time(self): - bt = psutil.boot_time() - self.assertIsInstance(bt, float) - self.assertGreater(bt, 0) - self.assertLess(bt, time.time()) - - @unittest.skipUnless(POSIX, 'posix only') - def test_PAGESIZE(self): - # pagesize is used internally to perform different calculations - # and it's determined by using SC_PAGE_SIZE; make sure - # getpagesize() returns the same value. - import resource - self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize()) - - def test_virtual_memory(self): - mem = psutil.virtual_memory() - assert mem.total > 0, mem - assert mem.available > 0, mem - assert 0 <= mem.percent <= 100, mem - assert mem.used > 0, mem - assert mem.free >= 0, mem - for name in mem._fields: - value = getattr(mem, name) - if name != 'percent': - self.assertIsInstance(value, (int, long)) - if name != 'total': - if not value >= 0: - self.fail("%r < 0 (%s)" % (name, value)) - if value > mem.total: - self.fail("%r > total (total=%s, %s=%s)" - % (name, mem.total, name, value)) - - def test_swap_memory(self): - mem = psutil.swap_memory() - assert mem.total >= 0, mem - assert mem.used >= 0, mem - if mem.total > 0: - # likely a system with no swap partition - assert mem.free > 0, mem - else: - assert mem.free == 0, mem - assert 0 <= mem.percent <= 100, mem - assert mem.sin >= 0, mem - assert mem.sout >= 0, mem - - def test_pid_exists(self): - sproc = get_test_subprocess(wait=True) - self.assertTrue(psutil.pid_exists(sproc.pid)) - p = psutil.Process(sproc.pid) - p.kill() - p.wait() - self.assertFalse(psutil.pid_exists(sproc.pid)) - self.assertFalse(psutil.pid_exists(-1)) - self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids()) - # pid 0 - psutil.pid_exists(0) == 0 in psutil.pids() - - def test_pid_exists_2(self): - reap_children() - pids = psutil.pids() - for pid in pids: - try: - assert psutil.pid_exists(pid) - except AssertionError: - # in case the process disappeared in meantime fail only - # if it is no longer in psutil.pids() - time.sleep(.1) - if pid in psutil.pids(): - self.fail(pid) - pids = range(max(pids) + 5000, max(pids) + 6000) - for pid in pids: - self.assertFalse(psutil.pid_exists(pid), msg=pid) - - def test_pids(self): - plist = [x.pid for x in psutil.process_iter()] - pidlist = psutil.pids() - self.assertEqual(plist.sort(), pidlist.sort()) - # make sure every pid is unique - self.assertEqual(len(pidlist), len(set(pidlist))) - - def test_test(self): - # test for psutil.test() function - stdout = sys.stdout - sys.stdout = DEVNULL - try: - psutil.test() - finally: - sys.stdout = stdout - - def test_cpu_count(self): - logical = psutil.cpu_count() - self.assertEqual(logical, len(psutil.cpu_times(percpu=True))) - self.assertGreaterEqual(logical, 1) - # - if LINUX: - with open("/proc/cpuinfo") as fd: - cpuinfo_data = fd.read() - if "physical id" not in cpuinfo_data: - raise unittest.SkipTest("cpuinfo doesn't include physical id") - physical = psutil.cpu_count(logical=False) - self.assertGreaterEqual(physical, 1) - self.assertGreaterEqual(logical, physical) - - def test_sys_cpu_times(self): - total = 0 - times = psutil.cpu_times() - sum(times) - for cp_time in times: - self.assertIsInstance(cp_time, float) - self.assertGreaterEqual(cp_time, 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - if not WINDOWS: - # CPU times are always supposed to increase over time or - # remain the same but never go backwards, see: - # https://github.com/giampaolo/psutil/issues/392 - last = psutil.cpu_times() - for x in range(100): - new = psutil.cpu_times() - for field in new._fields: - new_t = getattr(new, field) - last_t = getattr(last, field) - self.assertGreaterEqual(new_t, last_t, - msg="%s %s" % (new_t, last_t)) - last = new - - def test_sys_cpu_times2(self): - t1 = sum(psutil.cpu_times()) - time.sleep(0.1) - t2 = sum(psutil.cpu_times()) - difference = t2 - t1 - if not difference >= 0.05: - self.fail("difference %s" % difference) - - def test_sys_per_cpu_times(self): - for times in psutil.cpu_times(percpu=True): - total = 0 - sum(times) - for cp_time in times: - self.assertIsInstance(cp_time, float) - self.assertGreaterEqual(cp_time, 0.0) - total += cp_time - self.assertEqual(total, sum(times)) - str(times) - self.assertEqual(len(psutil.cpu_times(percpu=True)[0]), - len(psutil.cpu_times(percpu=False))) - - # Note: in theory CPU times are always supposed to increase over - # time or remain the same but never go backwards. In practice - # sometimes this is not the case. - # This issue seemd to be afflict Windows: - # https://github.com/giampaolo/psutil/issues/392 - # ...but it turns out also Linux (rarely) behaves the same. - # last = psutil.cpu_times(percpu=True) - # for x in range(100): - # new = psutil.cpu_times(percpu=True) - # for index in range(len(new)): - # newcpu = new[index] - # lastcpu = last[index] - # for field in newcpu._fields: - # new_t = getattr(newcpu, field) - # last_t = getattr(lastcpu, field) - # self.assertGreaterEqual( - # new_t, last_t, msg="%s %s" % (lastcpu, newcpu)) - # last = new - - def test_sys_per_cpu_times_2(self): - tot1 = psutil.cpu_times(percpu=True) - stop_at = time.time() + 0.1 - while True: - if time.time() >= stop_at: - break - tot2 = psutil.cpu_times(percpu=True) - for t1, t2 in zip(tot1, tot2): - t1, t2 = sum(t1), sum(t2) - difference = t2 - t1 - if difference >= 0.05: - return - self.fail() - - def _test_cpu_percent(self, percent, last_ret, new_ret): - try: - self.assertIsInstance(percent, float) - self.assertGreaterEqual(percent, 0.0) - self.assertIsNot(percent, -0.0) - self.assertLessEqual(percent, 100.0 * psutil.cpu_count()) - except AssertionError as err: - raise AssertionError("\n%s\nlast=%s\nnew=%s" % ( - err, pprint.pformat(last_ret), pprint.pformat(new_ret))) - - def test_sys_cpu_percent(self): - last = psutil.cpu_percent(interval=0.001) - for x in range(100): - new = psutil.cpu_percent(interval=None) - self._test_cpu_percent(new, last, new) - last = new - - def test_sys_per_cpu_percent(self): - last = psutil.cpu_percent(interval=0.001, percpu=True) - self.assertEqual(len(last), psutil.cpu_count()) - for x in range(100): - new = psutil.cpu_percent(interval=None, percpu=True) - for percent in new: - self._test_cpu_percent(percent, last, new) - last = new - - def test_sys_cpu_times_percent(self): - last = psutil.cpu_times_percent(interval=0.001) - for x in range(100): - new = psutil.cpu_times_percent(interval=None) - for percent in new: - self._test_cpu_percent(percent, last, new) - self._test_cpu_percent(sum(new), last, new) - last = new - - def test_sys_per_cpu_times_percent(self): - last = psutil.cpu_times_percent(interval=0.001, percpu=True) - self.assertEqual(len(last), psutil.cpu_count()) - for x in range(100): - new = psutil.cpu_times_percent(interval=None, percpu=True) - for cpu in new: - for percent in cpu: - self._test_cpu_percent(percent, last, new) - self._test_cpu_percent(sum(cpu), last, new) - last = new - - def test_sys_per_cpu_times_percent_negative(self): - # see: https://github.com/giampaolo/psutil/issues/645 - psutil.cpu_times_percent(percpu=True) - zero_times = [x._make([0 for x in range(len(x._fields))]) - for x in psutil.cpu_times(percpu=True)] - with mock.patch('psutil.cpu_times', return_value=zero_times): - for cpu in psutil.cpu_times_percent(percpu=True): - for percent in cpu: - self._test_cpu_percent(percent, None, None) - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - def test_disk_usage(self): - usage = psutil.disk_usage(os.getcwd()) - assert usage.total > 0, usage - assert usage.used > 0, usage - assert usage.free > 0, usage - assert usage.total > usage.used, usage - assert usage.total > usage.free, usage - assert 0 <= usage.percent <= 100, usage.percent - if hasattr(shutil, 'disk_usage'): - # py >= 3.3, see: http://bugs.python.org/issue12442 - shutil_usage = shutil.disk_usage(os.getcwd()) - tolerance = 5 * 1024 * 1024 # 5MB - self.assertEqual(usage.total, shutil_usage.total) - self.assertAlmostEqual(usage.free, shutil_usage.free, - delta=tolerance) - self.assertAlmostEqual(usage.used, shutil_usage.used, - delta=tolerance) - - # if path does not exist OSError ENOENT is expected across - # all platforms - fname = tempfile.mktemp() - try: - psutil.disk_usage(fname) - except OSError as err: - if err.args[0] != errno.ENOENT: - raise - else: - self.fail("OSError not raised") - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - def test_disk_usage_unicode(self): - # see: https://github.com/giampaolo/psutil/issues/416 - # XXX this test is not really reliable as it always fails on - # Python 3.X (2.X is fine) - try: - safe_rmdir(TESTFN_UNICODE) - os.mkdir(TESTFN_UNICODE) - psutil.disk_usage(TESTFN_UNICODE) - safe_rmdir(TESTFN_UNICODE) - except UnicodeEncodeError: - pass - - @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'), - "os.statvfs() function not available on this platform") - @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") - def test_disk_partitions(self): - # all = False - ls = psutil.disk_partitions(all=False) - # on travis we get: - # self.assertEqual(p.cpu_affinity(), [n]) - # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0] - self.assertTrue(ls, msg=ls) - for disk in ls: - if WINDOWS and 'cdrom' in disk.opts: - continue - if not POSIX: - assert os.path.exists(disk.device), disk - else: - # we cannot make any assumption about this, see: - # http://goo.gl/p9c43 - disk.device - if SUNOS: - # on solaris apparently mount points can also be files - assert os.path.exists(disk.mountpoint), disk - else: - assert os.path.isdir(disk.mountpoint), disk - assert disk.fstype, disk - self.assertIsInstance(disk.opts, str) - - # all = True - ls = psutil.disk_partitions(all=True) - self.assertTrue(ls, msg=ls) - for disk in psutil.disk_partitions(all=True): - if not WINDOWS: - try: - os.stat(disk.mountpoint) - except OSError as err: - # http://mail.python.org/pipermail/python-dev/ - # 2012-June/120787.html - if err.errno not in (errno.EPERM, errno.EACCES): - raise - else: - if SUNOS: - # on solaris apparently mount points can also be files - assert os.path.exists(disk.mountpoint), disk - else: - assert os.path.isdir(disk.mountpoint), disk - self.assertIsInstance(disk.fstype, str) - self.assertIsInstance(disk.opts, str) - - def find_mount_point(path): - path = os.path.abspath(path) - while not os.path.ismount(path): - path = os.path.dirname(path) - return path - - mount = find_mount_point(__file__) - mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)] - self.assertIn(mount, mounts) - psutil.disk_usage(mount) - - @skip_on_access_denied() - def test_net_connections(self): - def check(cons, families, types_): - for conn in cons: - self.assertIn(conn.family, families, msg=conn) - if conn.family != getattr(socket, 'AF_UNIX', object()): - self.assertIn(conn.type, types_, msg=conn) - - from psutil._common import conn_tmap - for kind, groups in conn_tmap.items(): - if SUNOS and kind == 'unix': - continue - families, types_ = groups - cons = psutil.net_connections(kind) - self.assertEqual(len(cons), len(set(cons))) - check(cons, families, types_) - - def test_net_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.bytes_sent) - self.assertEqual(nt[1], nt.bytes_recv) - self.assertEqual(nt[2], nt.packets_sent) - self.assertEqual(nt[3], nt.packets_recv) - self.assertEqual(nt[4], nt.errin) - self.assertEqual(nt[5], nt.errout) - self.assertEqual(nt[6], nt.dropin) - self.assertEqual(nt[7], nt.dropout) - assert nt.bytes_sent >= 0, nt - assert nt.bytes_recv >= 0, nt - assert nt.packets_sent >= 0, nt - assert nt.packets_recv >= 0, nt - assert nt.errin >= 0, nt - assert nt.errout >= 0, nt - assert nt.dropin >= 0, nt - assert nt.dropout >= 0, nt - - ret = psutil.net_io_counters(pernic=False) - check_ntuple(ret) - ret = psutil.net_io_counters(pernic=True) - self.assertNotEqual(ret, []) - for key in ret: - self.assertTrue(key) - check_ntuple(ret[key]) - - def test_net_if_addrs(self): - nics = psutil.net_if_addrs() - assert nics, nics - - # Not reliable on all platforms (net_if_addrs() reports more - # interfaces). - # self.assertEqual(sorted(nics.keys()), - # sorted(psutil.net_io_counters(pernic=True).keys())) - - families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK]) - for nic, addrs in nics.items(): - self.assertEqual(len(set(addrs)), len(addrs)) - for addr in addrs: - self.assertIsInstance(addr.family, int) - self.assertIsInstance(addr.address, str) - self.assertIsInstance(addr.netmask, (str, type(None))) - self.assertIsInstance(addr.broadcast, (str, type(None))) - self.assertIn(addr.family, families) - if sys.version_info >= (3, 4): - self.assertIsInstance(addr.family, enum.IntEnum) - if addr.family == socket.AF_INET: - s = socket.socket(addr.family) - with contextlib.closing(s): - s.bind((addr.address, 0)) - elif addr.family == socket.AF_INET6: - info = socket.getaddrinfo( - addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM, - 0, socket.AI_PASSIVE)[0] - af, socktype, proto, canonname, sa = info - s = socket.socket(af, socktype, proto) - with contextlib.closing(s): - s.bind(sa) - for ip in (addr.address, addr.netmask, addr.broadcast): - if ip is not None: - # TODO: skip AF_INET6 for now because I get: - # AddressValueError: Only hex digits permitted in - # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0' - if addr.family != AF_INET6: - check_ip_address(ip, addr.family) - - if BSD or OSX or SUNOS: - if hasattr(socket, "AF_LINK"): - self.assertEqual(psutil.AF_LINK, socket.AF_LINK) - elif LINUX: - self.assertEqual(psutil.AF_LINK, socket.AF_PACKET) - elif WINDOWS: - self.assertEqual(psutil.AF_LINK, -1) - - @unittest.skipIf(TRAVIS, "EPERM on travis") - def test_net_if_stats(self): - nics = psutil.net_if_stats() - assert nics, nics - all_duplexes = (psutil.NIC_DUPLEX_FULL, - psutil.NIC_DUPLEX_HALF, - psutil.NIC_DUPLEX_UNKNOWN) - for nic, stats in nics.items(): - isup, duplex, speed, mtu = stats - self.assertIsInstance(isup, bool) - self.assertIn(duplex, all_duplexes) - self.assertIn(duplex, all_duplexes) - self.assertGreaterEqual(speed, 0) - self.assertGreaterEqual(mtu, 0) - - @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'), - '/proc/diskstats not available on this linux version') - @unittest.skipIf(APPVEYOR, - "can't find any physical disk on Appveyor") - def test_disk_io_counters(self): - def check_ntuple(nt): - self.assertEqual(nt[0], nt.read_count) - self.assertEqual(nt[1], nt.write_count) - self.assertEqual(nt[2], nt.read_bytes) - self.assertEqual(nt[3], nt.write_bytes) - self.assertEqual(nt[4], nt.read_time) - self.assertEqual(nt[5], nt.write_time) - assert nt.read_count >= 0, nt - assert nt.write_count >= 0, nt - assert nt.read_bytes >= 0, nt - assert nt.write_bytes >= 0, nt - assert nt.read_time >= 0, nt - assert nt.write_time >= 0, nt - - ret = psutil.disk_io_counters(perdisk=False) - check_ntuple(ret) - ret = psutil.disk_io_counters(perdisk=True) - # make sure there are no duplicates - self.assertEqual(len(ret), len(set(ret))) - for key in ret: - assert key, key - check_ntuple(ret[key]) - if LINUX and key[-1].isdigit(): - # if 'sda1' is listed 'sda' shouldn't, see: - # https://github.com/giampaolo/psutil/issues/338 - while key[-1].isdigit(): - key = key[:-1] - self.assertNotIn(key, ret.keys()) - - def test_users(self): - users = psutil.users() - if not APPVEYOR: - self.assertNotEqual(users, []) - for user in users: - assert user.name, user - user.terminal - user.host - assert user.started > 0.0, user - datetime.datetime.fromtimestamp(user.started) - - -# =================================================================== -# --- psutil.Process class tests -# =================================================================== - -class TestProcess(unittest.TestCase): - """Tests for psutil.Process class.""" - - def setUp(self): - safe_remove(TESTFN) - - def tearDown(self): - reap_children() - - def test_pid(self): - self.assertEqual(psutil.Process().pid, os.getpid()) - sproc = get_test_subprocess() - self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid) - - def test_kill(self): - sproc = get_test_subprocess(wait=True) - test_pid = sproc.pid - p = psutil.Process(test_pid) - p.kill() - sig = p.wait() - self.assertFalse(psutil.pid_exists(test_pid)) - if POSIX: - self.assertEqual(sig, signal.SIGKILL) - - def test_terminate(self): - sproc = get_test_subprocess(wait=True) - test_pid = sproc.pid - p = psutil.Process(test_pid) - p.terminate() - sig = p.wait() - self.assertFalse(psutil.pid_exists(test_pid)) - if POSIX: - self.assertEqual(sig, signal.SIGTERM) - - def test_send_signal(self): - sig = signal.SIGKILL if POSIX else signal.SIGTERM - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.send_signal(sig) - exit_sig = p.wait() - self.assertFalse(psutil.pid_exists(p.pid)) - if POSIX: - self.assertEqual(exit_sig, sig) - # - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.send_signal(sig) - with mock.patch('psutil.os.kill', - side_effect=OSError(errno.ESRCH, "")) as fun: - with self.assertRaises(psutil.NoSuchProcess): - p.send_signal(sig) - assert fun.called - # - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.send_signal(sig) - with mock.patch('psutil.os.kill', - side_effect=OSError(errno.EPERM, "")) as fun: - with self.assertRaises(psutil.AccessDenied): - p.send_signal(sig) - assert fun.called - - def test_wait(self): - # check exit code signal - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.kill() - code = p.wait() - if POSIX: - self.assertEqual(code, signal.SIGKILL) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.terminate() - code = p.wait() - if POSIX: - self.assertEqual(code, signal.SIGTERM) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - # check sys.exit() code - code = "import time, sys; time.sleep(0.01); sys.exit(5);" - sproc = get_test_subprocess([PYTHON, "-c", code]) - p = psutil.Process(sproc.pid) - self.assertEqual(p.wait(), 5) - self.assertFalse(p.is_running()) - - # Test wait() issued twice. - # It is not supposed to raise NSP when the process is gone. - # On UNIX this should return None, on Windows it should keep - # returning the exit code. - sproc = get_test_subprocess([PYTHON, "-c", code]) - p = psutil.Process(sproc.pid) - self.assertEqual(p.wait(), 5) - self.assertIn(p.wait(), (5, None)) - - # test timeout - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.name() - self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01) - - # timeout < 0 not allowed - self.assertRaises(ValueError, p.wait, -1) - - # XXX why is this skipped on Windows? - @unittest.skipUnless(POSIX, 'skipped on Windows') - def test_wait_non_children(self): - # test wait() against processes which are not our children - code = "import sys;" - code += "from subprocess import Popen, PIPE;" - code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON - code += "sp = Popen(cmd, stdout=PIPE);" - code += "sys.stdout.write(str(sp.pid));" - sproc = get_test_subprocess([PYTHON, "-c", code], - stdout=subprocess.PIPE) - grandson_pid = int(sproc.stdout.read()) - grandson_proc = psutil.Process(grandson_pid) - try: - self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01) - grandson_proc.kill() - ret = grandson_proc.wait() - self.assertEqual(ret, None) - finally: - if grandson_proc.is_running(): - grandson_proc.kill() - grandson_proc.wait() - - def test_wait_timeout_0(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - self.assertRaises(psutil.TimeoutExpired, p.wait, 0) - p.kill() - stop_at = time.time() + 2 - while True: - try: - code = p.wait(0) - except psutil.TimeoutExpired: - if time.time() >= stop_at: - raise - else: - break - if POSIX: - self.assertEqual(code, signal.SIGKILL) - else: - self.assertEqual(code, 0) - self.assertFalse(p.is_running()) - - def test_cpu_percent(self): - p = psutil.Process() - p.cpu_percent(interval=0.001) - p.cpu_percent(interval=0.001) - for x in range(100): - percent = p.cpu_percent(interval=None) - self.assertIsInstance(percent, float) - self.assertGreaterEqual(percent, 0.0) - if not POSIX: - self.assertLessEqual(percent, 100.0) - else: - self.assertGreaterEqual(percent, 0.0) - - def test_cpu_times(self): - times = psutil.Process().cpu_times() - assert (times.user > 0.0) or (times.system > 0.0), times - # make sure returned values can be pretty printed with strftime - time.strftime("%H:%M:%S", time.localtime(times.user)) - time.strftime("%H:%M:%S", time.localtime(times.system)) - - # Test Process.cpu_times() against os.times() - # os.times() is broken on Python 2.6 - # http://bugs.python.org/issue1040026 - # XXX fails on OSX: not sure if it's for os.times(). We should - # try this with Python 2.7 and re-enable the test. - - @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX, - 'os.times() is not reliable on this Python version') - def test_cpu_times2(self): - user_time, kernel_time = psutil.Process().cpu_times() - utime, ktime = os.times()[:2] - - # Use os.times()[:2] as base values to compare our results - # using a tolerance of +/- 0.1 seconds. - # It will fail if the difference between the values is > 0.1s. - if (max([user_time, utime]) - min([user_time, utime])) > 0.1: - self.fail("expected: %s, found: %s" % (utime, user_time)) - - if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1: - self.fail("expected: %s, found: %s" % (ktime, kernel_time)) - - def test_create_time(self): - sproc = get_test_subprocess(wait=True) - now = time.time() - p = psutil.Process(sproc.pid) - create_time = p.create_time() - - # Use time.time() as base value to compare our result using a - # tolerance of +/- 1 second. - # It will fail if the difference between the values is > 2s. - difference = abs(create_time - now) - if difference > 2: - self.fail("expected: %s, found: %s, difference: %s" - % (now, create_time, difference)) - - # make sure returned value can be pretty printed with strftime - time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time())) - - @unittest.skipIf(WINDOWS, 'Windows only') - def test_terminal(self): - terminal = psutil.Process().terminal() - if sys.stdin.isatty(): - self.assertEqual(terminal, sh('tty')) - else: - assert terminal, repr(terminal) - - @unittest.skipUnless(LINUX or BSD or WINDOWS, - 'not available on this platform') - @skip_on_not_implemented(only_if=LINUX) - def test_io_counters(self): - p = psutil.Process() - # test reads - io1 = p.io_counters() - with open(PYTHON, 'rb') as f: - f.read() - io2 = p.io_counters() - if not BSD: - assert io2.read_count > io1.read_count, (io1, io2) - self.assertEqual(io2.write_count, io1.write_count) - assert io2.read_bytes >= io1.read_bytes, (io1, io2) - assert io2.write_bytes >= io1.write_bytes, (io1, io2) - # test writes - io1 = p.io_counters() - with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f: - if PY3: - f.write(bytes("x" * 1000000, 'ascii')) - else: - f.write("x" * 1000000) - io2 = p.io_counters() - assert io2.write_count >= io1.write_count, (io1, io2) - assert io2.write_bytes >= io1.write_bytes, (io1, io2) - assert io2.read_count >= io1.read_count, (io1, io2) - assert io2.read_bytes >= io1.read_bytes, (io1, io2) - - @unittest.skipUnless(LINUX or (WINDOWS and get_winver() >= WIN_VISTA), - 'Linux and Windows Vista only') - @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") - def test_ionice(self): - if LINUX: - from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT, - IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE) - self.assertEqual(IOPRIO_CLASS_NONE, 0) - self.assertEqual(IOPRIO_CLASS_RT, 1) - self.assertEqual(IOPRIO_CLASS_BE, 2) - self.assertEqual(IOPRIO_CLASS_IDLE, 3) - p = psutil.Process() - try: - p.ionice(2) - ioclass, value = p.ionice() - if enum is not None: - self.assertIsInstance(ioclass, enum.IntEnum) - self.assertEqual(ioclass, 2) - self.assertEqual(value, 4) - # - p.ionice(3) - ioclass, value = p.ionice() - self.assertEqual(ioclass, 3) - self.assertEqual(value, 0) - # - p.ionice(2, 0) - ioclass, value = p.ionice() - self.assertEqual(ioclass, 2) - self.assertEqual(value, 0) - p.ionice(2, 7) - ioclass, value = p.ionice() - self.assertEqual(ioclass, 2) - self.assertEqual(value, 7) - # - self.assertRaises(ValueError, p.ionice, 2, 10) - self.assertRaises(ValueError, p.ionice, 2, -1) - self.assertRaises(ValueError, p.ionice, 4) - self.assertRaises(TypeError, p.ionice, 2, "foo") - self.assertRaisesRegexp( - ValueError, "can't specify value with IOPRIO_CLASS_NONE", - p.ionice, psutil.IOPRIO_CLASS_NONE, 1) - self.assertRaisesRegexp( - ValueError, "can't specify value with IOPRIO_CLASS_IDLE", - p.ionice, psutil.IOPRIO_CLASS_IDLE, 1) - self.assertRaisesRegexp( - ValueError, "'ioclass' argument must be specified", - p.ionice, value=1) - finally: - p.ionice(IOPRIO_CLASS_NONE) - else: - p = psutil.Process() - original = p.ionice() - self.assertIsInstance(original, int) - try: - value = 0 # very low - if original == value: - value = 1 # low - p.ionice(value) - self.assertEqual(p.ionice(), value) - finally: - p.ionice(original) - # - self.assertRaises(ValueError, p.ionice, 3) - self.assertRaises(TypeError, p.ionice, 2, 1) - - @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, - "only available on Linux >= 2.6.36") - def test_rlimit_get(self): - import resource - p = psutil.Process(os.getpid()) - names = [x for x in dir(psutil) if x.startswith('RLIMIT')] - assert names, names - for name in names: - value = getattr(psutil, name) - self.assertGreaterEqual(value, 0) - if name in dir(resource): - self.assertEqual(value, getattr(resource, name)) - self.assertEqual(p.rlimit(value), resource.getrlimit(value)) - else: - ret = p.rlimit(value) - self.assertEqual(len(ret), 2) - self.assertGreaterEqual(ret[0], -1) - self.assertGreaterEqual(ret[1], -1) - - @unittest.skipUnless(LINUX and RLIMIT_SUPPORT, - "only available on Linux >= 2.6.36") - def test_rlimit_set(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.rlimit(psutil.RLIMIT_NOFILE, (5, 5)) - self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5)) - # If pid is 0 prlimit() applies to the calling process and - # we don't want that. - with self.assertRaises(ValueError): - psutil._psplatform.Process(0).rlimit(0) - with self.assertRaises(ValueError): - p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5)) - - def test_num_threads(self): - # on certain platforms such as Linux we might test for exact - # thread number, since we always have with 1 thread per process, - # but this does not apply across all platforms (OSX, Windows) - p = psutil.Process() - step1 = p.num_threads() - - thread = ThreadTask() - thread.start() - try: - step2 = p.num_threads() - self.assertEqual(step2, step1 + 1) - thread.stop() - finally: - if thread._running: - thread.stop() - - @unittest.skipUnless(WINDOWS, 'Windows only') - def test_num_handles(self): - # a better test is done later into test/_windows.py - p = psutil.Process() - self.assertGreater(p.num_handles(), 0) - - def test_threads(self): - p = psutil.Process() - step1 = p.threads() - - thread = ThreadTask() - thread.start() - - try: - step2 = p.threads() - self.assertEqual(len(step2), len(step1) + 1) - # on Linux, first thread id is supposed to be this process - if LINUX: - self.assertEqual(step2[0].id, os.getpid()) - athread = step2[0] - # test named tuple - self.assertEqual(athread.id, athread[0]) - self.assertEqual(athread.user_time, athread[1]) - self.assertEqual(athread.system_time, athread[2]) - # test num threads - thread.stop() - finally: - if thread._running: - thread.stop() - - def test_memory_info(self): - p = psutil.Process() - - # step 1 - get a base value to compare our results - rss1, vms1 = p.memory_info() - percent1 = p.memory_percent() - self.assertGreater(rss1, 0) - self.assertGreater(vms1, 0) - - # step 2 - allocate some memory - memarr = [None] * 1500000 - - rss2, vms2 = p.memory_info() - percent2 = p.memory_percent() - # make sure that the memory usage bumped up - self.assertGreater(rss2, rss1) - self.assertGreaterEqual(vms2, vms1) # vms might be equal - self.assertGreater(percent2, percent1) - del memarr - - # def test_memory_info_ex(self): - # # tested later in fetch all test suite - - def test_memory_maps(self): - p = psutil.Process() - maps = p.memory_maps() - paths = [x for x in maps] - self.assertEqual(len(paths), len(set(paths))) - ext_maps = p.memory_maps(grouped=False) - - for nt in maps: - if not nt.path.startswith('['): - assert os.path.isabs(nt.path), nt.path - if POSIX: - assert os.path.exists(nt.path), nt.path - else: - # XXX - On Windows we have this strange behavior with - # 64 bit dlls: they are visible via explorer but cannot - # be accessed via os.stat() (wtf?). - if '64' not in os.path.basename(nt.path): - assert os.path.exists(nt.path), nt.path - for nt in ext_maps: - for fname in nt._fields: - value = getattr(nt, fname) - if fname == 'path': - continue - elif fname in ('addr', 'perms'): - assert value, value - else: - self.assertIsInstance(value, (int, long)) - assert value >= 0, value - - def test_memory_percent(self): - p = psutil.Process() - self.assertGreater(p.memory_percent(), 0.0) - - def test_is_running(self): - sproc = get_test_subprocess(wait=True) - p = psutil.Process(sproc.pid) - assert p.is_running() - assert p.is_running() - p.kill() - p.wait() - assert not p.is_running() - assert not p.is_running() - - def test_exe(self): - sproc = get_test_subprocess(wait=True) - exe = psutil.Process(sproc.pid).exe() - try: - self.assertEqual(exe, PYTHON) - except AssertionError: - if WINDOWS and len(exe) == len(PYTHON): - # on Windows we don't care about case sensitivity - self.assertEqual(exe.lower(), PYTHON.lower()) - else: - # certain platforms such as BSD are more accurate returning: - # "/usr/local/bin/python2.7" - # ...instead of: - # "/usr/local/bin/python" - # We do not want to consider this difference in accuracy - # an error. - ver = "%s.%s" % (sys.version_info[0], sys.version_info[1]) - self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, '')) - - def test_cmdline(self): - cmdline = [PYTHON, "-c", "import time; time.sleep(60)"] - sproc = get_test_subprocess(cmdline, wait=True) - self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()), - ' '.join(cmdline)) - - def test_name(self): - sproc = get_test_subprocess(PYTHON, wait=True) - name = psutil.Process(sproc.pid).name().lower() - pyexe = os.path.basename(os.path.realpath(sys.executable)).lower() - assert pyexe.startswith(name), (pyexe, name) - - @unittest.skipUnless(POSIX, "posix only") - # TODO: add support for other compilers - @unittest.skipUnless(which("gcc"), "gcc not available") - def test_prog_w_funky_name(self): - # Test that name(), exe() and cmdline() correctly handle programs - # with funky chars such as spaces and ")", see: - # https://github.com/giampaolo/psutil/issues/628 - funky_name = "/tmp/foo bar )" - _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp") - self.addCleanup(lambda: safe_remove(c_file)) - self.addCleanup(lambda: safe_remove(funky_name)) - with open(c_file, "w") as f: - f.write("void main() { pause(); }") - subprocess.check_call(["gcc", c_file, "-o", funky_name]) - sproc = get_test_subprocess( - [funky_name, "arg1", "arg2", "", "arg3", ""]) - p = psutil.Process(sproc.pid) - # ...in order to try to prevent occasional failures on travis - wait_for_pid(p.pid) - self.assertEqual(p.name(), "foo bar )") - self.assertEqual(p.exe(), "/tmp/foo bar )") - self.assertEqual( - p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""]) - - @unittest.skipUnless(POSIX, 'posix only') - def test_uids(self): - p = psutil.Process() - real, effective, saved = p.uids() - # os.getuid() refers to "real" uid - self.assertEqual(real, os.getuid()) - # os.geteuid() refers to "effective" uid - self.assertEqual(effective, os.geteuid()) - # no such thing as os.getsuid() ("saved" uid), but starting - # from python 2.7 we have os.getresuid()[2] - if hasattr(os, "getresuid"): - self.assertEqual(saved, os.getresuid()[2]) - - @unittest.skipUnless(POSIX, 'posix only') - def test_gids(self): - p = psutil.Process() - real, effective, saved = p.gids() - # os.getuid() refers to "real" uid - self.assertEqual(real, os.getgid()) - # os.geteuid() refers to "effective" uid - self.assertEqual(effective, os.getegid()) - # no such thing as os.getsuid() ("saved" uid), but starting - # from python 2.7 we have os.getresgid()[2] - if hasattr(os, "getresuid"): - self.assertEqual(saved, os.getresgid()[2]) - - def test_nice(self): - p = psutil.Process() - self.assertRaises(TypeError, p.nice, "str") - if WINDOWS: - try: - init = p.nice() - if sys.version_info > (3, 4): - self.assertIsInstance(init, enum.IntEnum) - else: - self.assertIsInstance(init, int) - self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS) - p.nice(psutil.HIGH_PRIORITY_CLASS) - self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS) - p.nice(psutil.NORMAL_PRIORITY_CLASS) - self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS) - finally: - p.nice(psutil.NORMAL_PRIORITY_CLASS) - else: - try: - first_nice = p.nice() - p.nice(1) - self.assertEqual(p.nice(), 1) - # going back to previous nice value raises - # AccessDenied on OSX - if not OSX: - p.nice(0) - self.assertEqual(p.nice(), 0) - except psutil.AccessDenied: - pass - finally: - try: - p.nice(first_nice) - except psutil.AccessDenied: - pass - - def test_status(self): - p = psutil.Process() - self.assertEqual(p.status(), psutil.STATUS_RUNNING) - - def test_username(self): - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - if POSIX: - import pwd - self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name) - with mock.patch("psutil.pwd.getpwuid", - side_effect=KeyError) as fun: - p.username() == str(p.uids().real) - assert fun.called - - elif WINDOWS and 'USERNAME' in os.environ: - expected_username = os.environ['USERNAME'] - expected_domain = os.environ['USERDOMAIN'] - domain, username = p.username().split('\\') - self.assertEqual(domain, expected_domain) - self.assertEqual(username, expected_username) - else: - p.username() - - def test_cwd(self): - sproc = get_test_subprocess(wait=True) - p = psutil.Process(sproc.pid) - self.assertEqual(p.cwd(), os.getcwd()) - - def test_cwd_2(self): - cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"] - sproc = get_test_subprocess(cmd, wait=True) - p = psutil.Process(sproc.pid) - call_until(p.cwd, "ret == os.path.dirname(os.getcwd())") - - @unittest.skipUnless(WINDOWS or LINUX or BSD, - 'not available on this platform') - @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis") - def test_cpu_affinity(self): - p = psutil.Process() - initial = p.cpu_affinity() - if hasattr(os, "sched_getaffinity"): - self.assertEqual(initial, list(os.sched_getaffinity(p.pid))) - self.assertEqual(len(initial), len(set(initial))) - all_cpus = list(range(len(psutil.cpu_percent(percpu=True)))) - # setting on travis doesn't seem to work (always return all - # CPUs on get): - # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0] - for n in all_cpus: - p.cpu_affinity([n]) - self.assertEqual(p.cpu_affinity(), [n]) - if hasattr(os, "sched_getaffinity"): - self.assertEqual(p.cpu_affinity(), - list(os.sched_getaffinity(p.pid))) - # - p.cpu_affinity(all_cpus) - self.assertEqual(p.cpu_affinity(), all_cpus) - if hasattr(os, "sched_getaffinity"): - self.assertEqual(p.cpu_affinity(), - list(os.sched_getaffinity(p.pid))) - # - self.assertRaises(TypeError, p.cpu_affinity, 1) - p.cpu_affinity(initial) - # it should work with all iterables, not only lists - p.cpu_affinity(set(all_cpus)) - p.cpu_affinity(tuple(all_cpus)) - invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10] - self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu) - self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000)) - self.assertRaises(TypeError, p.cpu_affinity, [0, "1"]) - - # TODO - @unittest.skipIf(BSD, "broken on BSD, see #595") - @unittest.skipIf(APPVEYOR, - "can't find any process file on Appveyor") - def test_open_files(self): - # current process - p = psutil.Process() - files = p.open_files() - self.assertFalse(TESTFN in files) - with open(TESTFN, 'w'): - # give the kernel some time to see the new file - call_until(p.open_files, "len(ret) != %i" % len(files)) - filenames = [x.path for x in p.open_files()] - self.assertIn(TESTFN, filenames) - for file in filenames: - assert os.path.isfile(file), file - - # another process - cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN - sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True) - p = psutil.Process(sproc.pid) - - for x in range(100): - filenames = [x.path for x in p.open_files()] - if TESTFN in filenames: - break - time.sleep(.01) - else: - self.assertIn(TESTFN, filenames) - for file in filenames: - assert os.path.isfile(file), file - - # TODO - @unittest.skipIf(BSD, "broken on BSD, see #595") - @unittest.skipIf(APPVEYOR, - "can't find any process file on Appveyor") - def test_open_files2(self): - # test fd and path fields - with open(TESTFN, 'w') as fileobj: - p = psutil.Process() - for path, fd in p.open_files(): - if path == fileobj.name or fd == fileobj.fileno(): - break - else: - self.fail("no file found; files=%s" % repr(p.open_files())) - self.assertEqual(path, fileobj.name) - if WINDOWS: - self.assertEqual(fd, -1) - else: - self.assertEqual(fd, fileobj.fileno()) - # test positions - ntuple = p.open_files()[0] - self.assertEqual(ntuple[0], ntuple.path) - self.assertEqual(ntuple[1], ntuple.fd) - # test file is gone - self.assertTrue(fileobj.name not in p.open_files()) - - def compare_proc_sys_cons(self, pid, proc_cons): - from psutil._common import pconn - sys_cons = [] - for c in psutil.net_connections(kind='all'): - if c.pid == pid: - sys_cons.append(pconn(*c[:-1])) - if BSD: - # on BSD all fds are set to -1 - proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons] - self.assertEqual(sorted(proc_cons), sorted(sys_cons)) - - @skip_on_access_denied(only_if=OSX) - def test_connections(self): - def check_conn(proc, conn, family, type, laddr, raddr, status, kinds): - all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4", - "tcp6", "udp", "udp4", "udp6") - check_connection_ntuple(conn) - self.assertEqual(conn.family, family) - self.assertEqual(conn.type, type) - self.assertEqual(conn.laddr, laddr) - self.assertEqual(conn.raddr, raddr) - self.assertEqual(conn.status, status) - for kind in all_kinds: - cons = proc.connections(kind=kind) - if kind in kinds: - self.assertNotEqual(cons, []) - else: - self.assertEqual(cons, []) - # compare against system-wide connections - # XXX Solaris can't retrieve system-wide UNIX - # sockets. - if not SUNOS: - self.compare_proc_sys_cons(proc.pid, [conn]) - - tcp_template = textwrap.dedent(""" - import socket, time - s = socket.socket($family, socket.SOCK_STREAM) - s.bind(('$addr', 0)) - s.listen(1) - with open('$testfn', 'w') as f: - f.write(str(s.getsockname()[:2])) - time.sleep(60) - """) - - udp_template = textwrap.dedent(""" - import socket, time - s = socket.socket($family, socket.SOCK_DGRAM) - s.bind(('$addr', 0)) - with open('$testfn', 'w') as f: - f.write(str(s.getsockname()[:2])) - time.sleep(60) - """) - - from string import Template - testfile = os.path.basename(TESTFN) - tcp4_template = Template(tcp_template).substitute( - family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - udp4_template = Template(udp_template).substitute( - family=int(AF_INET), addr="127.0.0.1", testfn=testfile) - tcp6_template = Template(tcp_template).substitute( - family=int(AF_INET6), addr="::1", testfn=testfile) - udp6_template = Template(udp_template).substitute( - family=int(AF_INET6), addr="::1", testfn=testfile) - - # launch various subprocess instantiating a socket of various - # families and types to enrich psutil results - tcp4_proc = pyrun(tcp4_template) - tcp4_addr = eval(wait_for_file(testfile)) - udp4_proc = pyrun(udp4_template) - udp4_addr = eval(wait_for_file(testfile)) - if supports_ipv6(): - tcp6_proc = pyrun(tcp6_template) - tcp6_addr = eval(wait_for_file(testfile)) - udp6_proc = pyrun(udp6_template) - udp6_addr = eval(wait_for_file(testfile)) - else: - tcp6_proc = None - udp6_proc = None - tcp6_addr = None - udp6_addr = None - - for p in psutil.Process().children(): - cons = p.connections() - self.assertEqual(len(cons), 1) - for conn in cons: - # TCP v4 - if p.pid == tcp4_proc.pid: - check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (), - psutil.CONN_LISTEN, - ("all", "inet", "inet4", "tcp", "tcp4")) - # UDP v4 - elif p.pid == udp4_proc.pid: - check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (), - psutil.CONN_NONE, - ("all", "inet", "inet4", "udp", "udp4")) - # TCP v6 - elif p.pid == getattr(tcp6_proc, "pid", None): - check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (), - psutil.CONN_LISTEN, - ("all", "inet", "inet6", "tcp", "tcp6")) - # UDP v6 - elif p.pid == getattr(udp6_proc, "pid", None): - check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (), - psutil.CONN_NONE, - ("all", "inet", "inet6", "udp", "udp6")) - - @unittest.skipUnless(hasattr(socket, 'AF_UNIX'), - 'AF_UNIX is not supported') - @skip_on_access_denied(only_if=OSX) - def test_connections_unix(self): - def check(type): - safe_remove(TESTFN) - sock = socket.socket(AF_UNIX, type) - with contextlib.closing(sock): - sock.bind(TESTFN) - cons = psutil.Process().connections(kind='unix') - conn = cons[0] - check_connection_ntuple(conn) - if conn.fd != -1: # != sunos and windows - self.assertEqual(conn.fd, sock.fileno()) - self.assertEqual(conn.family, AF_UNIX) - self.assertEqual(conn.type, type) - self.assertEqual(conn.laddr, TESTFN) - if not SUNOS: - # XXX Solaris can't retrieve system-wide UNIX - # sockets. - self.compare_proc_sys_cons(os.getpid(), cons) - - check(SOCK_STREAM) - check(SOCK_DGRAM) - - @unittest.skipUnless(hasattr(socket, "fromfd"), - 'socket.fromfd() is not availble') - @unittest.skipIf(WINDOWS or SUNOS, - 'connection fd not available on this platform') - def test_connection_fromfd(self): - with contextlib.closing(socket.socket()) as sock: - sock.bind(('localhost', 0)) - sock.listen(1) - p = psutil.Process() - for conn in p.connections(): - if conn.fd == sock.fileno(): - break - else: - self.fail("couldn't find socket fd") - dupsock = socket.fromfd(conn.fd, conn.family, conn.type) - with contextlib.closing(dupsock): - self.assertEqual(dupsock.getsockname(), conn.laddr) - self.assertNotEqual(sock.fileno(), dupsock.fileno()) - - def test_connection_constants(self): - ints = [] - strs = [] - for name in dir(psutil): - if name.startswith('CONN_'): - num = getattr(psutil, name) - str_ = str(num) - assert str_.isupper(), str_ - assert str_ not in strs, str_ - assert num not in ints, num - ints.append(num) - strs.append(str_) - if SUNOS: - psutil.CONN_IDLE - psutil.CONN_BOUND - if WINDOWS: - psutil.CONN_DELETE_TCB - - @unittest.skipUnless(POSIX, 'posix only') - def test_num_fds(self): - p = psutil.Process() - start = p.num_fds() - file = open(TESTFN, 'w') - self.addCleanup(file.close) - self.assertEqual(p.num_fds(), start + 1) - sock = socket.socket() - self.addCleanup(sock.close) - self.assertEqual(p.num_fds(), start + 2) - file.close() - sock.close() - self.assertEqual(p.num_fds(), start) - - @skip_on_not_implemented(only_if=LINUX) - def test_num_ctx_switches(self): - p = psutil.Process() - before = sum(p.num_ctx_switches()) - for x in range(500000): - after = sum(p.num_ctx_switches()) - if after > before: - return - self.fail("num ctx switches still the same after 50.000 iterations") - - def test_parent_ppid(self): - this_parent = os.getpid() - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - self.assertEqual(p.ppid(), this_parent) - self.assertEqual(p.parent().pid, this_parent) - # no other process is supposed to have us as parent - for p in psutil.process_iter(): - if p.pid == sproc.pid: - continue - self.assertTrue(p.ppid() != this_parent) - - def test_children(self): - p = psutil.Process() - self.assertEqual(p.children(), []) - self.assertEqual(p.children(recursive=True), []) - sproc = get_test_subprocess() - children1 = p.children() - children2 = p.children(recursive=True) - for children in (children1, children2): - self.assertEqual(len(children), 1) - self.assertEqual(children[0].pid, sproc.pid) - self.assertEqual(children[0].ppid(), os.getpid()) - - def test_children_recursive(self): - # here we create a subprocess which creates another one as in: - # A (parent) -> B (child) -> C (grandchild) - s = "import subprocess, os, sys, time;" - s += "PYTHON = os.path.realpath(sys.executable);" - s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];" - s += "subprocess.Popen(cmd);" - s += "time.sleep(60);" - get_test_subprocess(cmd=[PYTHON, "-c", s]) - p = psutil.Process() - self.assertEqual(len(p.children(recursive=False)), 1) - # give the grandchild some time to start - stop_at = time.time() + GLOBAL_TIMEOUT - while time.time() < stop_at: - children = p.children(recursive=True) - if len(children) > 1: - break - self.assertEqual(len(children), 2) - self.assertEqual(children[0].ppid(), os.getpid()) - self.assertEqual(children[1].ppid(), children[0].pid) - - def test_children_duplicates(self): - # find the process which has the highest number of children - table = collections.defaultdict(int) - for p in psutil.process_iter(): - try: - table[p.ppid()] += 1 - except psutil.Error: - pass - # this is the one, now let's make sure there are no duplicates - pid = sorted(table.items(), key=lambda x: x[1])[-1][0] - p = psutil.Process(pid) - try: - c = p.children(recursive=True) - except psutil.AccessDenied: # windows - pass - else: - self.assertEqual(len(c), len(set(c))) - - def test_suspend_resume(self): - sproc = get_test_subprocess(wait=True) - p = psutil.Process(sproc.pid) - p.suspend() - for x in range(100): - if p.status() == psutil.STATUS_STOPPED: - break - time.sleep(0.01) - p.resume() - self.assertNotEqual(p.status(), psutil.STATUS_STOPPED) - - def test_invalid_pid(self): - self.assertRaises(TypeError, psutil.Process, "1") - self.assertRaises(ValueError, psutil.Process, -1) - - def test_as_dict(self): - p = psutil.Process() - d = p.as_dict(attrs=['exe', 'name']) - self.assertEqual(sorted(d.keys()), ['exe', 'name']) - - p = psutil.Process(min(psutil.pids())) - d = p.as_dict(attrs=['connections'], ad_value='foo') - if not isinstance(d['connections'], list): - self.assertEqual(d['connections'], 'foo') - - def test_halfway_terminated_process(self): - # Test that NoSuchProcess exception gets raised in case the - # process dies after we create the Process object. - # Example: - # >>> proc = Process(1234) - # >>> time.sleep(2) # time-consuming task, process dies in meantime - # >>> proc.name() - # Refers to Issue #15 - sproc = get_test_subprocess() - p = psutil.Process(sproc.pid) - p.terminate() - p.wait() - if WINDOWS: - wait_for_pid(p.pid) - self.assertFalse(p.is_running()) - self.assertFalse(p.pid in psutil.pids()) - - excluded_names = ['pid', 'is_running', 'wait', 'create_time'] - if LINUX and not RLIMIT_SUPPORT: - excluded_names.append('rlimit') - for name in dir(p): - if (name.startswith('_') or - name in excluded_names): - continue - try: - meth = getattr(p, name) - # get/set methods - if name == 'nice': - if POSIX: - ret = meth(1) - else: - ret = meth(psutil.NORMAL_PRIORITY_CLASS) - elif name == 'ionice': - ret = meth() - ret = meth(2) - elif name == 'rlimit': - ret = meth(psutil.RLIMIT_NOFILE) - ret = meth(psutil.RLIMIT_NOFILE, (5, 5)) - elif name == 'cpu_affinity': - ret = meth() - ret = meth([0]) - elif name == 'send_signal': - ret = meth(signal.SIGTERM) - else: - ret = meth() - except psutil.ZombieProcess: - self.fail("ZombieProcess for %r was not supposed to happen" % - name) - except psutil.NoSuchProcess: - pass - except NotImplementedError: - pass - else: - self.fail( - "NoSuchProcess exception not raised for %r, retval=%s" % ( - name, ret)) - - @unittest.skipUnless(POSIX, 'posix only') - def test_zombie_process(self): - def succeed_or_zombie_p_exc(fun, *args, **kwargs): - try: - fun(*args, **kwargs) - except (psutil.ZombieProcess, psutil.AccessDenied): - pass - - # Note: in this test we'll be creating two sub processes. - # Both of them are supposed to be freed / killed by - # reap_children() as they are attributable to 'us' - # (os.getpid()) via children(recursive=True). - src = textwrap.dedent("""\ - import os, sys, time, socket, contextlib - child_pid = os.fork() - if child_pid > 0: - time.sleep(3000) - else: - # this is the zombie process - s = socket.socket(socket.AF_UNIX) - with contextlib.closing(s): - s.connect('%s') - if sys.version_info < (3, ): - pid = str(os.getpid()) - else: - pid = bytes(str(os.getpid()), 'ascii') - s.sendall(pid) - """ % TESTFN) - with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock: - try: - sock.settimeout(GLOBAL_TIMEOUT) - sock.bind(TESTFN) - sock.listen(1) - pyrun(src) - conn, _ = sock.accept() - select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT) - zpid = int(conn.recv(1024)) - zproc = psutil.Process(zpid) - call_until(lambda: zproc.status(), - "ret == psutil.STATUS_ZOMBIE") - # A zombie process should always be instantiable - zproc = psutil.Process(zpid) - # ...and at least its status always be querable - self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE) - # ...and it should be considered 'running' - self.assertTrue(zproc.is_running()) - # ...and as_dict() shouldn't crash - zproc.as_dict() - if hasattr(zproc, "rlimit"): - succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE) - succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE, - (5, 5)) - # set methods - succeed_or_zombie_p_exc(zproc.parent) - if hasattr(zproc, 'cpu_affinity'): - succeed_or_zombie_p_exc(zproc.cpu_affinity, [0]) - succeed_or_zombie_p_exc(zproc.nice, 0) - if hasattr(zproc, 'ionice'): - if LINUX: - succeed_or_zombie_p_exc(zproc.ionice, 2, 0) - else: - succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows - if hasattr(zproc, 'rlimit'): - succeed_or_zombie_p_exc(zproc.rlimit, - psutil.RLIMIT_NOFILE, (5, 5)) - succeed_or_zombie_p_exc(zproc.suspend) - succeed_or_zombie_p_exc(zproc.resume) - succeed_or_zombie_p_exc(zproc.terminate) - succeed_or_zombie_p_exc(zproc.kill) - - # ...its parent should 'see' it - # edit: not true on BSD and OSX - # descendants = [x.pid for x in psutil.Process().children( - # recursive=True)] - # self.assertIn(zpid, descendants) - # XXX should we also assume ppid be usable? Note: this - # would be an important use case as the only way to get - # rid of a zombie is to kill its parent. - # self.assertEqual(zpid.ppid(), os.getpid()) - # ...and all other APIs should be able to deal with it - self.assertTrue(psutil.pid_exists(zpid)) - self.assertIn(zpid, psutil.pids()) - self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) - psutil._pmap = {} - self.assertIn(zpid, [x.pid for x in psutil.process_iter()]) - finally: - reap_children(search_all=True) - - def test_pid_0(self): - # Process(0) is supposed to work on all platforms except Linux - if 0 not in psutil.pids(): - self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0) - return - - p = psutil.Process(0) - self.assertTrue(p.name()) - - if POSIX: - try: - self.assertEqual(p.uids().real, 0) - self.assertEqual(p.gids().real, 0) - except psutil.AccessDenied: - pass - - self.assertRaisesRegexp( - ValueError, "preventing sending signal to process with PID 0", - p.send_signal, signal.SIGTERM) - - self.assertIn(p.ppid(), (0, 1)) - # self.assertEqual(p.exe(), "") - p.cmdline() - try: - p.num_threads() - except psutil.AccessDenied: - pass - - try: - p.memory_info() - except psutil.AccessDenied: - pass - - try: - if POSIX: - self.assertEqual(p.username(), 'root') - elif WINDOWS: - self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM') - else: - p.username() - except psutil.AccessDenied: - pass - - self.assertIn(0, psutil.pids()) - self.assertTrue(psutil.pid_exists(0)) - - def test_Popen(self): - # Popen class test - # XXX this test causes a ResourceWarning on Python 3 because - # psutil.__subproc instance doesn't get propertly freed. - # Not sure what to do though. - cmd = [PYTHON, "-c", "import time; time.sleep(60);"] - proc = psutil.Popen(cmd, stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - try: - proc.name() - proc.stdin - self.assertTrue(hasattr(proc, 'name')) - self.assertTrue(hasattr(proc, 'stdin')) - self.assertTrue(dir(proc)) - self.assertRaises(AttributeError, getattr, proc, 'foo') - finally: - proc.kill() - proc.wait() - self.assertIsNotNone(proc.returncode) - - -# =================================================================== -# --- Featch all processes test -# =================================================================== - -class TestFetchAllProcesses(unittest.TestCase): - """Test which iterates over all running processes and performs - some sanity checks against Process API's returned values. - """ - - def setUp(self): - if POSIX: - import pwd - pall = pwd.getpwall() - self._uids = set([x.pw_uid for x in pall]) - self._usernames = set([x.pw_name for x in pall]) - - def test_fetch_all(self): - valid_procs = 0 - excluded_names = set([ - 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait', - 'as_dict', 'cpu_percent', 'parent', 'children', 'pid']) - if LINUX and not RLIMIT_SUPPORT: - excluded_names.add('rlimit') - attrs = [] - for name in dir(psutil.Process): - if name.startswith("_"): - continue - if name in excluded_names: - continue - attrs.append(name) - - default = object() - failures = [] - for name in attrs: - for p in psutil.process_iter(): - ret = default - try: - try: - args = () - attr = getattr(p, name, None) - if attr is not None and callable(attr): - if name == 'rlimit': - args = (psutil.RLIMIT_NOFILE,) - ret = attr(*args) - else: - ret = attr - valid_procs += 1 - except NotImplementedError: - msg = "%r was skipped because not implemented" % ( - self.__class__.__name__ + '.test_' + name) - warn(msg) - except (psutil.NoSuchProcess, psutil.AccessDenied) as err: - self.assertEqual(err.pid, p.pid) - if err.name: - # make sure exception's name attr is set - # with the actual process name - self.assertEqual(err.name, p.name()) - self.assertTrue(str(err)) - self.assertTrue(err.msg) - else: - if ret not in (0, 0.0, [], None, ''): - assert ret, ret - meth = getattr(self, name) - meth(ret) - except Exception as err: - s = '\n' + '=' * 70 + '\n' - s += "FAIL: test_%s (proc=%s" % (name, p) - if ret != default: - s += ", ret=%s)" % repr(ret) - s += ')\n' - s += '-' * 70 - s += "\n%s" % traceback.format_exc() - s = "\n".join((" " * 4) + i for i in s.splitlines()) - failures.append(s) - break - - if failures: - self.fail(''.join(failures)) - - # we should always have a non-empty list, not including PID 0 etc. - # special cases. - self.assertTrue(valid_procs > 0) - - def cmdline(self, ret): - pass - - def exe(self, ret): - if not ret: - self.assertEqual(ret, '') - else: - assert os.path.isabs(ret), ret - # Note: os.stat() may return False even if the file is there - # hence we skip the test, see: - # http://stackoverflow.com/questions/3112546/os-path-exists-lies - if POSIX and os.path.isfile(ret): - if hasattr(os, 'access') and hasattr(os, "X_OK"): - # XXX may fail on OSX - self.assertTrue(os.access(ret, os.X_OK)) - - def ppid(self, ret): - self.assertTrue(ret >= 0) - - def name(self, ret): - self.assertIsInstance(ret, (str, unicode)) - self.assertTrue(ret) - - def create_time(self, ret): - self.assertTrue(ret > 0) - # this can't be taken for granted on all platforms - # self.assertGreaterEqual(ret, psutil.boot_time()) - # make sure returned value can be pretty printed - # with strftime - time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret)) - - def uids(self, ret): - for uid in ret: - self.assertTrue(uid >= 0) - self.assertIn(uid, self._uids) - - def gids(self, ret): - # note: testing all gids as above seems not to be reliable for - # gid == 30 (nodoby); not sure why. - for gid in ret: - self.assertTrue(gid >= 0) - # self.assertIn(uid, self.gids - - def username(self, ret): - self.assertTrue(ret) - if POSIX: - self.assertIn(ret, self._usernames) - - def status(self, ret): - self.assertTrue(ret != "") - self.assertTrue(ret != '?') - self.assertIn(ret, VALID_PROC_STATUSES) - - def io_counters(self, ret): - for field in ret: - if field != -1: - self.assertTrue(field >= 0) - - def ionice(self, ret): - if LINUX: - self.assertTrue(ret.ioclass >= 0) - self.assertTrue(ret.value >= 0) - else: - self.assertTrue(ret >= 0) - self.assertIn(ret, (0, 1, 2)) - - def num_threads(self, ret): - self.assertTrue(ret >= 1) - - def threads(self, ret): - for t in ret: - self.assertTrue(t.id >= 0) - self.assertTrue(t.user_time >= 0) - self.assertTrue(t.system_time >= 0) - - def cpu_times(self, ret): - self.assertTrue(ret.user >= 0) - self.assertTrue(ret.system >= 0) - - def memory_info(self, ret): - self.assertTrue(ret.rss >= 0) - self.assertTrue(ret.vms >= 0) - - def memory_info_ex(self, ret): - for name in ret._fields: - self.assertTrue(getattr(ret, name) >= 0) - if POSIX and ret.vms != 0: - # VMS is always supposed to be the highest - for name in ret._fields: - if name != 'vms': - value = getattr(ret, name) - assert ret.vms > value, ret - elif WINDOWS: - assert ret.peak_wset >= ret.wset, ret - assert ret.peak_paged_pool >= ret.paged_pool, ret - assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret - assert ret.peak_pagefile >= ret.pagefile, ret - - def open_files(self, ret): - for f in ret: - if WINDOWS: - assert f.fd == -1, f - else: - self.assertIsInstance(f.fd, int) - assert os.path.isabs(f.path), f - assert os.path.isfile(f.path), f - - def num_fds(self, ret): - self.assertTrue(ret >= 0) - - def connections(self, ret): - self.assertEqual(len(ret), len(set(ret))) - for conn in ret: - check_connection_ntuple(conn) - - def cwd(self, ret): - if ret is not None: # BSD may return None - assert os.path.isabs(ret), ret - try: - st = os.stat(ret) - except OSError as err: - # directory has been removed in mean time - if err.errno != errno.ENOENT: - raise - else: - self.assertTrue(stat.S_ISDIR(st.st_mode)) - - def memory_percent(self, ret): - assert 0 <= ret <= 100, ret - - def is_running(self, ret): - self.assertTrue(ret) - - def cpu_affinity(self, ret): - assert ret != [], ret - - def terminal(self, ret): - if ret is not None: - assert os.path.isabs(ret), ret - assert os.path.exists(ret), ret - - def memory_maps(self, ret): - for nt in ret: - for fname in nt._fields: - value = getattr(nt, fname) - if fname == 'path': - if not value.startswith('['): - assert os.path.isabs(nt.path), nt.path - # commented as on Linux we might get - # '/foo/bar (deleted)' - # assert os.path.exists(nt.path), nt.path - elif fname in ('addr', 'perms'): - self.assertTrue(value) - else: - self.assertIsInstance(value, (int, long)) - assert value >= 0, value - - def num_handles(self, ret): - if WINDOWS: - self.assertGreaterEqual(ret, 0) - else: - self.assertGreaterEqual(ret, 0) - - def nice(self, ret): - if POSIX: - assert -20 <= ret <= 20, ret - else: - priorities = [getattr(psutil, x) for x in dir(psutil) - if x.endswith('_PRIORITY_CLASS')] - self.assertIn(ret, priorities) - - def num_ctx_switches(self, ret): - self.assertTrue(ret.voluntary >= 0) - self.assertTrue(ret.involuntary >= 0) - - def rlimit(self, ret): - self.assertEqual(len(ret), 2) - self.assertGreaterEqual(ret[0], -1) - self.assertGreaterEqual(ret[1], -1) - - -# =================================================================== -# --- Limited user tests -# =================================================================== - -@unittest.skipUnless(POSIX, "UNIX only") -@unittest.skipUnless(hasattr(os, 'getuid') and os.getuid() == 0, - "super user privileges are required") -class LimitedUserTestCase(TestProcess): - """Repeat the previous tests by using a limited user. - Executed only on UNIX and only if the user who run the test script - is root. - """ - # the uid/gid the test suite runs under - if hasattr(os, 'getuid'): - PROCESS_UID = os.getuid() - PROCESS_GID = os.getgid() - - def __init__(self, *args, **kwargs): - TestProcess.__init__(self, *args, **kwargs) - # re-define all existent test methods in order to - # ignore AccessDenied exceptions - for attr in [x for x in dir(self) if x.startswith('test')]: - meth = getattr(self, attr) - - def test_(self): - try: - meth() - except psutil.AccessDenied: - pass - setattr(self, attr, types.MethodType(test_, self)) - - def setUp(self): - safe_remove(TESTFN) - TestProcess.setUp(self) - os.setegid(1000) - os.seteuid(1000) - - def tearDown(self): - os.setegid(self.PROCESS_UID) - os.seteuid(self.PROCESS_GID) - TestProcess.tearDown(self) - - def test_nice(self): - try: - psutil.Process().nice(-1) - except psutil.AccessDenied: - pass - else: - self.fail("exception not raised") - - def test_zombie_process(self): - # causes problems if test test suite is run as root - pass - - -# =================================================================== -# --- Misc tests -# =================================================================== - -class TestMisc(unittest.TestCase): - """Misc / generic tests.""" - - def test_process__repr__(self, func=repr): - p = psutil.Process() - r = func(p) - self.assertIn("psutil.Process", r) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("name=", r) - self.assertIn(p.name(), r) - with mock.patch.object(psutil.Process, "name", - side_effect=psutil.ZombieProcess(os.getpid())): - p = psutil.Process() - r = func(p) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("zombie", r) - self.assertNotIn("name=", r) - with mock.patch.object(psutil.Process, "name", - side_effect=psutil.NoSuchProcess(os.getpid())): - p = psutil.Process() - r = func(p) - self.assertIn("pid=%s" % p.pid, r) - self.assertIn("terminated", r) - self.assertNotIn("name=", r) - - def test_process__str__(self): - self.test_process__repr__(func=str) - - def test_no_such_process__repr__(self, func=repr): - self.assertEqual( - repr(psutil.NoSuchProcess(321)), - "psutil.NoSuchProcess process no longer exists (pid=321)") - self.assertEqual( - repr(psutil.NoSuchProcess(321, name='foo')), - "psutil.NoSuchProcess process no longer exists (pid=321, " - "name='foo')") - self.assertEqual( - repr(psutil.NoSuchProcess(321, msg='foo')), - "psutil.NoSuchProcess foo") - - def test_zombie_process__repr__(self, func=repr): - self.assertEqual( - repr(psutil.ZombieProcess(321)), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321)") - self.assertEqual( - repr(psutil.ZombieProcess(321, name='foo')), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321, name='foo')") - self.assertEqual( - repr(psutil.ZombieProcess(321, name='foo', ppid=1)), - "psutil.ZombieProcess process still exists but it's a zombie " - "(pid=321, name='foo', ppid=1)") - self.assertEqual( - repr(psutil.ZombieProcess(321, msg='foo')), - "psutil.ZombieProcess foo") - - def test_access_denied__repr__(self, func=repr): - self.assertEqual( - repr(psutil.AccessDenied(321)), - "psutil.AccessDenied (pid=321)") - self.assertEqual( - repr(psutil.AccessDenied(321, name='foo')), - "psutil.AccessDenied (pid=321, name='foo')") - self.assertEqual( - repr(psutil.AccessDenied(321, msg='foo')), - "psutil.AccessDenied foo") - - def test_timeout_expired__repr__(self, func=repr): - self.assertEqual( - repr(psutil.TimeoutExpired(321)), - "psutil.TimeoutExpired timeout after 321 seconds") - self.assertEqual( - repr(psutil.TimeoutExpired(321, pid=111)), - "psutil.TimeoutExpired timeout after 321 seconds (pid=111)") - self.assertEqual( - repr(psutil.TimeoutExpired(321, pid=111, name='foo')), - "psutil.TimeoutExpired timeout after 321 seconds " - "(pid=111, name='foo')") - - def test_process__eq__(self): - p1 = psutil.Process() - p2 = psutil.Process() - self.assertEqual(p1, p2) - p2._ident = (0, 0) - self.assertNotEqual(p1, p2) - self.assertNotEqual(p1, 'foo') - - def test_process__hash__(self): - s = set([psutil.Process(), psutil.Process()]) - self.assertEqual(len(s), 1) - - def test__all__(self): - dir_psutil = dir(psutil) - for name in dir_psutil: - if name in ('callable', 'error', 'namedtuple', - 'long', 'test', 'NUM_CPUS', 'BOOT_TIME', - 'TOTAL_PHYMEM'): - continue - if not name.startswith('_'): - try: - __import__(name) - except ImportError: - if name not in psutil.__all__: - fun = getattr(psutil, name) - if fun is None: - continue - if (fun.__doc__ is not None and - 'deprecated' not in fun.__doc__.lower()): - self.fail('%r not in psutil.__all__' % name) - - # Import 'star' will break if __all__ is inconsistent, see: - # https://github.com/giampaolo/psutil/issues/656 - # Can't do `from psutil import *` as it won't work on python 3 - # so we simply iterate over __all__. - for name in psutil.__all__: - self.assertIn(name, dir_psutil) - - def test_version(self): - self.assertEqual('.'.join([str(x) for x in psutil.version_info]), - psutil.__version__) - - def test_memoize(self): - from psutil._common import memoize - - @memoize - def foo(*args, **kwargs): - "foo docstring" - calls.append(None) - return (args, kwargs) - - calls = [] - # no args - for x in range(2): - ret = foo() - expected = ((), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 1) - # with args - for x in range(2): - ret = foo(1) - expected = ((1, ), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 2) - # with args + kwargs - for x in range(2): - ret = foo(1, bar=2) - expected = ((1, ), {'bar': 2}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 3) - # clear cache - foo.cache_clear() - ret = foo() - expected = ((), {}) - self.assertEqual(ret, expected) - self.assertEqual(len(calls), 4) - # docstring - self.assertEqual(foo.__doc__, "foo docstring") - - def test_isfile_strict(self): - from psutil._common import isfile_strict - this_file = os.path.abspath(__file__) - assert isfile_strict(this_file) - assert not isfile_strict(os.path.dirname(this_file)) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EPERM, "foo")): - self.assertRaises(OSError, isfile_strict, this_file) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EACCES, "foo")): - self.assertRaises(OSError, isfile_strict, this_file) - with mock.patch('psutil._common.os.stat', - side_effect=OSError(errno.EINVAL, "foo")): - assert not isfile_strict(this_file) - with mock.patch('psutil._common.stat.S_ISREG', return_value=False): - assert not isfile_strict(this_file) - - def test_serialization(self): - def check(ret): - if json is not None: - json.loads(json.dumps(ret)) - a = pickle.dumps(ret) - b = pickle.loads(a) - self.assertEqual(ret, b) - - check(psutil.Process().as_dict()) - check(psutil.virtual_memory()) - check(psutil.swap_memory()) - check(psutil.cpu_times()) - check(psutil.cpu_times_percent(interval=0)) - check(psutil.net_io_counters()) - if LINUX and not os.path.exists('/proc/diskstats'): - pass - else: - if not APPVEYOR: - check(psutil.disk_io_counters()) - check(psutil.disk_partitions()) - check(psutil.disk_usage(os.getcwd())) - check(psutil.users()) - - def test_setup_script(self): - here = os.path.abspath(os.path.dirname(__file__)) - setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py')) - module = imp.load_source('setup', setup_py) - self.assertRaises(SystemExit, module.setup) - self.assertEqual(module.get_version(), psutil.__version__) - - def test_ad_on_process_creation(self): - # We are supposed to be able to instantiate Process also in case - # of zombie processes or access denied. - with mock.patch.object(psutil.Process, 'create_time', - side_effect=psutil.AccessDenied) as meth: - psutil.Process() - assert meth.called - with mock.patch.object(psutil.Process, 'create_time', - side_effect=psutil.ZombieProcess(1)) as meth: - psutil.Process() - assert meth.called - with mock.patch.object(psutil.Process, 'create_time', - side_effect=ValueError) as meth: - with self.assertRaises(ValueError): - psutil.Process() - assert meth.called - - -# =================================================================== -# --- Example script tests -# =================================================================== - -class TestExampleScripts(unittest.TestCase): - """Tests for scripts in the examples directory.""" - - def assert_stdout(self, exe, args=None): - exe = os.path.join(EXAMPLES_DIR, exe) - if args: - exe = exe + ' ' + args - try: - out = sh(sys.executable + ' ' + exe).strip() - except RuntimeError as err: - if 'AccessDenied' in str(err): - return str(err) - else: - raise - assert out, out - return out - - def assert_syntax(self, exe, args=None): - exe = os.path.join(EXAMPLES_DIR, exe) - with open(exe, 'r') as f: - src = f.read() - ast.parse(src) - - def test_check_presence(self): - # make sure all example scripts have a test method defined - meths = dir(self) - for name in os.listdir(EXAMPLES_DIR): - if name.endswith('.py'): - if 'test_' + os.path.splitext(name)[0] not in meths: - # self.assert_stdout(name) - self.fail('no test defined for %r script' - % os.path.join(EXAMPLES_DIR, name)) - - def test_disk_usage(self): - self.assert_stdout('disk_usage.py') - - def test_free(self): - self.assert_stdout('free.py') - - def test_meminfo(self): - self.assert_stdout('meminfo.py') - - def test_process_detail(self): - self.assert_stdout('process_detail.py') - - @unittest.skipIf(APPVEYOR, "can't find users on Appveyor") - def test_who(self): - self.assert_stdout('who.py') - - def test_ps(self): - self.assert_stdout('ps.py') - - def test_pstree(self): - self.assert_stdout('pstree.py') - - def test_netstat(self): - self.assert_stdout('netstat.py') - - @unittest.skipIf(TRAVIS, "permission denied on travis") - def test_ifconfig(self): - self.assert_stdout('ifconfig.py') - - def test_pmap(self): - self.assert_stdout('pmap.py', args=str(os.getpid())) - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_killall(self): - self.assert_syntax('killall.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_nettop(self): - self.assert_syntax('nettop.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_top(self): - self.assert_syntax('top.py') - - @unittest.skipIf(ast is None, - 'ast module not available on this python version') - def test_iotop(self): - self.assert_syntax('iotop.py') - - def test_pidof(self): - output = self.assert_stdout('pidof.py %s' % psutil.Process().name()) - self.assertIn(str(os.getpid()), output) - - -def main(): - tests = [] - test_suite = unittest.TestSuite() - tests.append(TestSystemAPIs) - tests.append(TestProcess) - tests.append(TestFetchAllProcesses) - tests.append(TestMisc) - tests.append(TestExampleScripts) - tests.append(LimitedUserTestCase) - - if POSIX: - from _posix import PosixSpecificTestCase - tests.append(PosixSpecificTestCase) - - # import the specific platform test suite - stc = None - if LINUX: - from _linux import LinuxSpecificTestCase as stc - elif WINDOWS: - from _windows import WindowsSpecificTestCase as stc - from _windows import TestDualProcessImplementation - tests.append(TestDualProcessImplementation) - elif OSX: - from _osx import OSXSpecificTestCase as stc - elif BSD: - from _bsd import BSDSpecificTestCase as stc - elif SUNOS: - from _sunos import SunOSSpecificTestCase as stc - if stc is not None: - tests.append(stc) - - for test_class in tests: - test_suite.addTest(unittest.makeSuite(test_class)) - result = unittest.TextTestRunner(verbosity=2).run(test_suite) - return result.wasSuccessful() - -if __name__ == '__main__': - if not main(): - sys.exit(1) |