summaryrefslogtreecommitdiffstats
path: root/python/psutil/test
diff options
context:
space:
mode:
Diffstat (limited to 'python/psutil/test')
m---------python/psutil0
-rw-r--r--python/psutil/test/README.rst21
-rw-r--r--python/psutil/test/_bsd.py252
-rw-r--r--python/psutil/test/_linux.py473
-rw-r--r--python/psutil/test/_osx.py160
-rw-r--r--python/psutil/test/_posix.py258
-rw-r--r--python/psutil/test/_sunos.py48
-rw-r--r--python/psutil/test/_windows.py464
-rw-r--r--python/psutil/test/test_memory_leaks.py445
-rw-r--r--python/psutil/test/test_psutil.py3013
10 files changed, 0 insertions, 5134 deletions
diff --git a/python/psutil b/python/psutil
new file mode 160000
+Subproject a0967043b5819b2edc61d9a12306289d5e7f98c
diff --git a/python/psutil/test/README.rst b/python/psutil/test/README.rst
deleted file mode 100644
index 3f2a468ef..000000000
--- a/python/psutil/test/README.rst
+++ /dev/null
@@ -1,21 +0,0 @@
-- The recommended way to run tests (also on Windows) is to cd into parent
- directory and run ``make test``
-
-- Dependencies for running tests:
- - python 2.6: ipaddress, mock, unittest2
- - python 2.7: ipaddress, mock
- - python 3.2: ipaddress, mock
- - python 3.3: ipaddress
- - python >= 3.4: no deps required
-
-- The main test script is ``test_psutil.py``, which also imports platform-specific
- ``_*.py`` scripts (which should be ignored).
-
-- ``test_memory_leaks.py`` looks for memory leaks into C extension modules and must
- be run separately with ``make test-memleaks``.
-
-- To run tests on all supported Python version install tox (pip install tox)
- then run ``tox``.
-
-- Every time a commit is pushed tests are automatically run on Travis:
- https://travis-ci.org/giampaolo/psutil/
diff --git a/python/psutil/test/_bsd.py b/python/psutil/test/_bsd.py
deleted file mode 100644
index e4a3225d2..000000000
--- a/python/psutil/test/_bsd.py
+++ /dev/null
@@ -1,252 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-# TODO: add test for comparing connections with 'sockstat' cmd
-
-"""BSD specific tests. These are implicitly run by test_psutil.py."""
-
-import os
-import subprocess
-import sys
-import time
-
-import psutil
-
-from psutil._compat import PY3
-from test_psutil import (TOLERANCE, BSD, sh, get_test_subprocess, which,
- retry_before_failing, reap_children, unittest)
-
-
-PAGESIZE = os.sysconf("SC_PAGE_SIZE")
-if os.getuid() == 0: # muse requires root privileges
- MUSE_AVAILABLE = which('muse')
-else:
- MUSE_AVAILABLE = False
-
-
-def sysctl(cmdline):
- """Expects a sysctl command with an argument and parse the result
- returning only the value of interest.
- """
- result = sh("sysctl " + cmdline)
- result = result[result.find(": ") + 2:]
- try:
- return int(result)
- except ValueError:
- return result
-
-
-def muse(field):
- """Thin wrapper around 'muse' cmdline utility."""
- out = sh('muse')
- for line in out.split('\n'):
- if line.startswith(field):
- break
- else:
- raise ValueError("line not found")
- return int(line.split()[1])
-
-
-@unittest.skipUnless(BSD, "not a BSD system")
-class BSDSpecificTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
-
- def test_boot_time(self):
- s = sysctl('sysctl kern.boottime')
- s = s[s.find(" sec = ") + 7:]
- s = s[:s.find(',')]
- btime = int(s)
- self.assertEqual(btime, psutil.boot_time())
-
- def test_process_create_time(self):
- cmdline = "ps -o lstart -p %s" % self.pid
- p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
- output = p.communicate()[0]
- if PY3:
- output = str(output, sys.stdout.encoding)
- start_ps = output.replace('STARTED', '').strip()
- start_psutil = psutil.Process(self.pid).create_time()
- start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
- time.localtime(start_psutil))
- self.assertEqual(start_ps, start_psutil)
-
- def test_disks(self):
- # test psutil.disk_usage() and psutil.disk_partitions()
- # against "df -a"
- def df(path):
- out = sh('df -k "%s"' % path).strip()
- lines = out.split('\n')
- lines.pop(0)
- line = lines.pop(0)
- dev, total, used, free = line.split()[:4]
- if dev == 'none':
- dev = ''
- total = int(total) * 1024
- used = int(used) * 1024
- free = int(free) * 1024
- return dev, total, used, free
-
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- dev, total, used, free = df(part.mountpoint)
- self.assertEqual(part.device, dev)
- self.assertEqual(usage.total, total)
- # 10 MB tollerance
- if abs(usage.free - free) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.free, free))
- if abs(usage.used - used) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.used, used))
-
- @retry_before_failing()
- def test_memory_maps(self):
- out = sh('procstat -v %s' % self.pid)
- maps = psutil.Process(self.pid).memory_maps(grouped=False)
- lines = out.split('\n')[1:]
- while lines:
- line = lines.pop()
- fields = line.split()
- _, start, stop, perms, res = fields[:5]
- map = maps.pop()
- self.assertEqual("%s-%s" % (start, stop), map.addr)
- self.assertEqual(int(res), map.rss)
- if not map.path.startswith('['):
- self.assertEqual(fields[10], map.path)
-
- def test_exe(self):
- out = sh('procstat -b %s' % self.pid)
- self.assertEqual(psutil.Process(self.pid).exe(),
- out.split('\n')[1].split()[-1])
-
- def test_cmdline(self):
- out = sh('procstat -c %s' % self.pid)
- self.assertEqual(' '.join(psutil.Process(self.pid).cmdline()),
- ' '.join(out.split('\n')[1].split()[2:]))
-
- def test_uids_gids(self):
- out = sh('procstat -s %s' % self.pid)
- euid, ruid, suid, egid, rgid, sgid = out.split('\n')[1].split()[2:8]
- p = psutil.Process(self.pid)
- uids = p.uids()
- gids = p.gids()
- self.assertEqual(uids.real, int(ruid))
- self.assertEqual(uids.effective, int(euid))
- self.assertEqual(uids.saved, int(suid))
- self.assertEqual(gids.real, int(rgid))
- self.assertEqual(gids.effective, int(egid))
- self.assertEqual(gids.saved, int(sgid))
-
- # --- virtual_memory(); tests against sysctl
-
- def test_vmem_total(self):
- syst = sysctl("sysctl vm.stats.vm.v_page_count") * PAGESIZE
- self.assertEqual(psutil.virtual_memory().total, syst)
-
- @retry_before_failing()
- def test_vmem_active(self):
- syst = sysctl("vm.stats.vm.v_active_count") * PAGESIZE
- self.assertAlmostEqual(psutil.virtual_memory().active, syst,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_inactive(self):
- syst = sysctl("vm.stats.vm.v_inactive_count") * PAGESIZE
- self.assertAlmostEqual(psutil.virtual_memory().inactive, syst,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_wired(self):
- syst = sysctl("vm.stats.vm.v_wire_count") * PAGESIZE
- self.assertAlmostEqual(psutil.virtual_memory().wired, syst,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_cached(self):
- syst = sysctl("vm.stats.vm.v_cache_count") * PAGESIZE
- self.assertAlmostEqual(psutil.virtual_memory().cached, syst,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_free(self):
- syst = sysctl("vm.stats.vm.v_free_count") * PAGESIZE
- self.assertAlmostEqual(psutil.virtual_memory().free, syst,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_buffers(self):
- syst = sysctl("vfs.bufspace")
- self.assertAlmostEqual(psutil.virtual_memory().buffers, syst,
- delta=TOLERANCE)
-
- def test_cpu_count_logical(self):
- syst = sysctl("hw.ncpu")
- self.assertEqual(psutil.cpu_count(logical=True), syst)
-
- # --- virtual_memory(); tests against muse
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- def test_total(self):
- num = muse('Total')
- self.assertEqual(psutil.virtual_memory().total, num)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_active(self):
- num = muse('Active')
- self.assertAlmostEqual(psutil.virtual_memory().active, num,
- delta=TOLERANCE)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_inactive(self):
- num = muse('Inactive')
- self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
- delta=TOLERANCE)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_wired(self):
- num = muse('Wired')
- self.assertAlmostEqual(psutil.virtual_memory().wired, num,
- delta=TOLERANCE)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_cached(self):
- num = muse('Cache')
- self.assertAlmostEqual(psutil.virtual_memory().cached, num,
- delta=TOLERANCE)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_free(self):
- num = muse('Free')
- self.assertAlmostEqual(psutil.virtual_memory().free, num,
- delta=TOLERANCE)
-
- @unittest.skipUnless(MUSE_AVAILABLE, "muse cmdline tool is not available")
- @retry_before_failing()
- def test_buffers(self):
- num = muse('Buffer')
- self.assertAlmostEqual(psutil.virtual_memory().buffers, num,
- delta=TOLERANCE)
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(BSDSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/_linux.py b/python/psutil/test/_linux.py
deleted file mode 100644
index c1927ea8b..000000000
--- a/python/psutil/test/_linux.py
+++ /dev/null
@@ -1,473 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Linux specific tests. These are implicitly run by test_psutil.py."""
-
-from __future__ import division
-import contextlib
-import errno
-import fcntl
-import io
-import os
-import pprint
-import re
-import socket
-import struct
-import sys
-import tempfile
-import time
-import warnings
-
-try:
- from unittest import mock # py3
-except ImportError:
- import mock # requires "pip install mock"
-
-from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX
-from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
- retry_before_failing, get_kernel_version, unittest,
- which, call_until)
-
-import psutil
-import psutil._pslinux
-from psutil._compat import PY3, u
-
-
-SIOCGIFADDR = 0x8915
-SIOCGIFCONF = 0x8912
-SIOCGIFHWADDR = 0x8927
-
-
-def get_ipv4_address(ifname):
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- return socket.inet_ntoa(
- fcntl.ioctl(s.fileno(),
- SIOCGIFADDR,
- struct.pack('256s', ifname))[20:24])
-
-
-def get_mac_address(ifname):
- ifname = ifname[:15]
- if PY3:
- ifname = bytes(ifname, 'ascii')
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- with contextlib.closing(s):
- info = fcntl.ioctl(
- s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
- if PY3:
- def ord(x):
- return x
- else:
- import __builtin__
- ord = __builtin__.ord
- return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
-
-
-@unittest.skipUnless(LINUX, "not a Linux system")
-class LinuxSpecificTestCase(unittest.TestCase):
-
- @unittest.skipIf(
- POSIX and not hasattr(os, 'statvfs'),
- reason="os.statvfs() function not available on this platform")
- @skip_on_not_implemented()
- def test_disks(self):
- # test psutil.disk_usage() and psutil.disk_partitions()
- # against "df -a"
- def df(path):
- out = sh('df -P -B 1 "%s"' % path).strip()
- lines = out.split('\n')
- lines.pop(0)
- line = lines.pop(0)
- dev, total, used, free = line.split()[:4]
- if dev == 'none':
- dev = ''
- total, used, free = int(total), int(used), int(free)
- return dev, total, used, free
-
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- dev, total, used, free = df(part.mountpoint)
- self.assertEqual(part.device, dev)
- self.assertEqual(usage.total, total)
- # 10 MB tollerance
- if abs(usage.free - free) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.free, free))
- if abs(usage.used - used) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % (usage.used, used))
-
- def test_memory_maps(self):
- sproc = get_test_subprocess()
- time.sleep(1)
- p = psutil.Process(sproc.pid)
- maps = p.memory_maps(grouped=False)
- pmap = sh('pmap -x %s' % p.pid).split('\n')
- # get rid of header
- del pmap[0]
- del pmap[0]
- while maps and pmap:
- this = maps.pop(0)
- other = pmap.pop(0)
- addr, _, rss, dirty, mode, path = other.split(None, 5)
- if not path.startswith('[') and not path.endswith(']'):
- self.assertEqual(path, os.path.basename(this.path))
- self.assertEqual(int(rss) * 1024, this.rss)
- # test only rwx chars, ignore 's' and 'p'
- self.assertEqual(mode[:3], this.perms[:3])
-
- def test_vmem_total(self):
- lines = sh('free').split('\n')[1:]
- total = int(lines[0].split()[1]) * 1024
- self.assertEqual(total, psutil.virtual_memory().total)
-
- @retry_before_failing()
- def test_vmem_used(self):
- lines = sh('free').split('\n')[1:]
- used = int(lines[0].split()[2]) * 1024
- self.assertAlmostEqual(used, psutil.virtual_memory().used,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_free(self):
- lines = sh('free').split('\n')[1:]
- free = int(lines[0].split()[3]) * 1024
- self.assertAlmostEqual(free, psutil.virtual_memory().free,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_buffers(self):
- lines = sh('free').split('\n')[1:]
- buffers = int(lines[0].split()[5]) * 1024
- self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_cached(self):
- lines = sh('free').split('\n')[1:]
- cached = int(lines[0].split()[6]) * 1024
- self.assertAlmostEqual(cached, psutil.virtual_memory().cached,
- delta=TOLERANCE)
-
- def test_swapmem_total(self):
- lines = sh('free').split('\n')[1:]
- total = int(lines[2].split()[1]) * 1024
- self.assertEqual(total, psutil.swap_memory().total)
-
- @retry_before_failing()
- def test_swapmem_used(self):
- lines = sh('free').split('\n')[1:]
- used = int(lines[2].split()[2]) * 1024
- self.assertAlmostEqual(used, psutil.swap_memory().used,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_swapmem_free(self):
- lines = sh('free').split('\n')[1:]
- free = int(lines[2].split()[3]) * 1024
- self.assertAlmostEqual(free, psutil.swap_memory().free,
- delta=TOLERANCE)
-
- @unittest.skipIf(TRAVIS, "unknown failure on travis")
- def test_cpu_times(self):
- fields = psutil.cpu_times()._fields
- kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
- kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
- if kernel_ver_info >= (2, 6, 11):
- self.assertIn('steal', fields)
- else:
- self.assertNotIn('steal', fields)
- if kernel_ver_info >= (2, 6, 24):
- self.assertIn('guest', fields)
- else:
- self.assertNotIn('guest', fields)
- if kernel_ver_info >= (3, 2, 0):
- self.assertIn('guest_nice', fields)
- else:
- self.assertNotIn('guest_nice', fields)
-
- def test_net_if_addrs_ips(self):
- for name, addrs in psutil.net_if_addrs().items():
- for addr in addrs:
- if addr.family == psutil.AF_LINK:
- self.assertEqual(addr.address, get_mac_address(name))
- elif addr.family == socket.AF_INET:
- self.assertEqual(addr.address, get_ipv4_address(name))
- # TODO: test for AF_INET6 family
-
- @unittest.skipUnless(which('ip'), "'ip' utility not available")
- @unittest.skipIf(TRAVIS, "skipped on Travis")
- def test_net_if_names(self):
- out = sh("ip addr").strip()
- nics = psutil.net_if_addrs()
- found = 0
- for line in out.split('\n'):
- line = line.strip()
- if re.search("^\d+:", line):
- found += 1
- name = line.split(':')[1].strip()
- self.assertIn(name, nics.keys())
- self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
- pprint.pformat(nics), out))
-
- @unittest.skipUnless(which("nproc"), "nproc utility not available")
- def test_cpu_count_logical_w_nproc(self):
- num = int(sh("nproc --all"))
- self.assertEqual(psutil.cpu_count(logical=True), num)
-
- @unittest.skipUnless(which("lscpu"), "lscpu utility not available")
- def test_cpu_count_logical_w_lscpu(self):
- out = sh("lscpu -p")
- num = len([x for x in out.split('\n') if not x.startswith('#')])
- self.assertEqual(psutil.cpu_count(logical=True), num)
-
- # --- mocked tests
-
- def test_virtual_memory_mocked_warnings(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil._pslinux.virtual_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
- self.assertIn(
- "'cached', 'active' and 'inactive' memory stats couldn't "
- "be determined", str(w.message))
- self.assertEqual(ret.cached, 0)
- self.assertEqual(ret.active, 0)
- self.assertEqual(ret.inactive, 0)
-
- def test_swap_memory_mocked_warnings(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- with warnings.catch_warnings(record=True) as ws:
- warnings.simplefilter("always")
- ret = psutil._pslinux.swap_memory()
- assert m.called
- self.assertEqual(len(ws), 1)
- w = ws[0]
- self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
- self.assertIn(
- "'sin' and 'sout' swap memory stats couldn't "
- "be determined", str(w.message))
- self.assertEqual(ret.sin, 0)
- self.assertEqual(ret.sout, 0)
-
- def test_cpu_count_logical_mocked(self):
- import psutil._pslinux
- original = psutil._pslinux.cpu_count_logical()
- # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
- # order to cause the parsing of /proc/cpuinfo and /proc/stat.
- with mock.patch(
- 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
- assert m.called
-
- # Let's have open() return emtpy data and make sure None is
- # returned ('cause we mimick os.cpu_count()).
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertIsNone(psutil._pslinux.cpu_count_logical())
- self.assertEqual(m.call_count, 2)
- # /proc/stat should be the last one
- self.assertEqual(m.call_args[0][0], '/proc/stat')
-
- # Let's push this a bit further and make sure /proc/cpuinfo
- # parsing works as expected.
- with open('/proc/cpuinfo', 'rb') as f:
- cpuinfo_data = f.read()
- fake_file = io.BytesIO(cpuinfo_data)
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
-
- def test_cpu_count_physical_mocked(self):
- # Have open() return emtpy data and make sure None is returned
- # ('cause we want to mimick os.cpu_count())
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertIsNone(psutil._pslinux.cpu_count_physical())
- assert m.called
-
- def test_proc_open_files_file_gone(self):
- # simulates a file which gets deleted during open_files()
- # execution
- p = psutil.Process()
- files = p.open_files()
- with tempfile.NamedTemporaryFile():
- # give the kernel some time to see the new file
- call_until(p.open_files, "len(ret) != %i" % len(files))
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.ENOENT, "")) as m:
- files = p.open_files()
- assert not files
- assert m.called
- # also simulate the case where os.readlink() returns EINVAL
- # in which case psutil is supposed to 'continue'
- with mock.patch('psutil._pslinux.os.readlink',
- side_effect=OSError(errno.EINVAL, "")) as m:
- self.assertEqual(p.open_files(), [])
- assert m.called
-
- def test_proc_terminal_mocked(self):
- with mock.patch('psutil._pslinux._psposix._get_terminal_map',
- return_value={}) as m:
- self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
- assert m.called
-
- def test_proc_num_ctx_switches_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).num_ctx_switches)
- assert m.called
-
- def test_proc_num_threads_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).num_threads)
- assert m.called
-
- def test_proc_ppid_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).ppid)
- assert m.called
-
- def test_proc_uids_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).uids)
- assert m.called
-
- def test_proc_gids_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).gids)
- assert m.called
-
- def test_proc_cmdline_mocked(self):
- # see: https://github.com/giampaolo/psutil/issues/639
- p = psutil.Process()
- fake_file = io.StringIO(u('foo\x00bar\x00'))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- p.cmdline() == ['foo', 'bar']
- assert m.called
- fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m:
- p.cmdline() == ['foo', 'bar', '']
- assert m.called
-
- def test_proc_io_counters_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- NotImplementedError,
- psutil._pslinux.Process(os.getpid()).io_counters)
- assert m.called
-
- def test_boot_time_mocked(self):
- with mock.patch('psutil._pslinux.open', create=True) as m:
- self.assertRaises(
- RuntimeError,
- psutil._pslinux.boot_time)
- assert m.called
-
- def test_users_mocked(self):
- # Make sure ':0' and ':0.0' (returned by C ext) are converted
- # to 'localhost'.
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', ':0.0',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'localhost')
- assert m.called
- # ...otherwise it should be returned as-is
- with mock.patch('psutil._pslinux.cext.users',
- return_value=[('giampaolo', 'pts/2', 'foo',
- 1436573184.0, True)]) as m:
- self.assertEqual(psutil.users()[0].host, 'foo')
- assert m.called
-
- def test_disk_partitions_mocked(self):
- # Test that ZFS partitions are returned.
- with open("/proc/filesystems", "r") as f:
- data = f.read()
- if 'zfs' in data:
- for part in psutil.disk_partitions():
- if part.fstype == 'zfs':
- break
- else:
- self.fail("couldn't find any ZFS partition")
- else:
- # No ZFS partitions on this system. Let's fake one.
- fake_file = io.StringIO(u("nodev\tzfs\n"))
- with mock.patch('psutil._pslinux.open',
- return_value=fake_file, create=True) as m1:
- with mock.patch(
- 'psutil._pslinux.cext.disk_partitions',
- return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
- ret = psutil.disk_partitions()
- assert m1.called
- assert m2.called
- assert ret
- self.assertEqual(ret[0].fstype, 'zfs')
-
- # --- tests for specific kernel versions
-
- @unittest.skipUnless(
- get_kernel_version() >= (2, 6, 36),
- "prlimit() not available on this Linux kernel version")
- def test_prlimit_availability(self):
- # prlimit() should be available starting from kernel 2.6.36
- p = psutil.Process(os.getpid())
- p.rlimit(psutil.RLIMIT_NOFILE)
- # if prlimit() is supported *at least* these constants should
- # be available
- self.assertTrue(hasattr(psutil, "RLIM_INFINITY"))
- self.assertTrue(hasattr(psutil, "RLIMIT_AS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_CORE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_CPU"))
- self.assertTrue(hasattr(psutil, "RLIMIT_DATA"))
- self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NPROC"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RSS"))
- self.assertTrue(hasattr(psutil, "RLIMIT_STACK"))
-
- @unittest.skipUnless(
- get_kernel_version() >= (3, 0),
- "prlimit constants not available on this Linux kernel version")
- def test_resource_consts_kernel_v(self):
- # more recent constants
- self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_NICE"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO"))
- self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME"))
- self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/_osx.py b/python/psutil/test/_osx.py
deleted file mode 100644
index 6e6e4380e..000000000
--- a/python/psutil/test/_osx.py
+++ /dev/null
@@ -1,160 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""OSX specific tests. These are implicitly run by test_psutil.py."""
-
-import os
-import re
-import subprocess
-import sys
-import time
-
-import psutil
-
-from psutil._compat import PY3
-from test_psutil import (TOLERANCE, OSX, sh, get_test_subprocess,
- reap_children, retry_before_failing, unittest)
-
-
-PAGESIZE = os.sysconf("SC_PAGE_SIZE")
-
-
-def sysctl(cmdline):
- """Expects a sysctl command with an argument and parse the result
- returning only the value of interest.
- """
- p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
- result = p.communicate()[0].strip().split()[1]
- if PY3:
- result = str(result, sys.stdout.encoding)
- try:
- return int(result)
- except ValueError:
- return result
-
-
-def vm_stat(field):
- """Wrapper around 'vm_stat' cmdline utility."""
- out = sh('vm_stat')
- for line in out.split('\n'):
- if field in line:
- break
- else:
- raise ValueError("line not found")
- return int(re.search('\d+', line).group(0)) * PAGESIZE
-
-
-@unittest.skipUnless(OSX, "not an OSX system")
-class OSXSpecificTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
-
- def test_process_create_time(self):
- cmdline = "ps -o lstart -p %s" % self.pid
- p = subprocess.Popen(cmdline, shell=1, stdout=subprocess.PIPE)
- output = p.communicate()[0]
- if PY3:
- output = str(output, sys.stdout.encoding)
- start_ps = output.replace('STARTED', '').strip()
- start_psutil = psutil.Process(self.pid).create_time()
- start_psutil = time.strftime("%a %b %e %H:%M:%S %Y",
- time.localtime(start_psutil))
- self.assertEqual(start_ps, start_psutil)
-
- def test_disks(self):
- # test psutil.disk_usage() and psutil.disk_partitions()
- # against "df -a"
- def df(path):
- out = sh('df -k "%s"' % path).strip()
- lines = out.split('\n')
- lines.pop(0)
- line = lines.pop(0)
- dev, total, used, free = line.split()[:4]
- if dev == 'none':
- dev = ''
- total = int(total) * 1024
- used = int(used) * 1024
- free = int(free) * 1024
- return dev, total, used, free
-
- for part in psutil.disk_partitions(all=False):
- usage = psutil.disk_usage(part.mountpoint)
- dev, total, used, free = df(part.mountpoint)
- self.assertEqual(part.device, dev)
- self.assertEqual(usage.total, total)
- # 10 MB tollerance
- if abs(usage.free - free) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % usage.free, free)
- if abs(usage.used - used) > 10 * 1024 * 1024:
- self.fail("psutil=%s, df=%s" % usage.used, used)
-
- # --- virtual mem
-
- def test_vmem_total(self):
- sysctl_hwphymem = sysctl('sysctl hw.memsize')
- self.assertEqual(sysctl_hwphymem, psutil.virtual_memory().total)
-
- @retry_before_failing()
- def test_vmem_free(self):
- num = vm_stat("free")
- self.assertAlmostEqual(psutil.virtual_memory().free, num,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_active(self):
- num = vm_stat("active")
- self.assertAlmostEqual(psutil.virtual_memory().active, num,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_inactive(self):
- num = vm_stat("inactive")
- self.assertAlmostEqual(psutil.virtual_memory().inactive, num,
- delta=TOLERANCE)
-
- @retry_before_failing()
- def test_vmem_wired(self):
- num = vm_stat("wired")
- self.assertAlmostEqual(psutil.virtual_memory().wired, num,
- delta=TOLERANCE)
-
- # --- swap mem
-
- def test_swapmem_sin(self):
- num = vm_stat("Pageins")
- self.assertEqual(psutil.swap_memory().sin, num)
-
- def test_swapmem_sout(self):
- num = vm_stat("Pageouts")
- self.assertEqual(psutil.swap_memory().sout, num)
-
- def test_swapmem_total(self):
- tot1 = psutil.swap_memory().total
- tot2 = 0
- # OSX uses multiple cache files:
- # http://en.wikipedia.org/wiki/Paging#OS_X
- for name in os.listdir("/var/vm/"):
- file = os.path.join("/var/vm", name)
- if os.path.isfile(file):
- tot2 += os.path.getsize(file)
- self.assertEqual(tot1, tot2)
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(OSXSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/_posix.py b/python/psutil/test/_posix.py
deleted file mode 100644
index e6c56aac3..000000000
--- a/python/psutil/test/_posix.py
+++ /dev/null
@@ -1,258 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""POSIX specific tests. These are implicitly run by test_psutil.py."""
-
-import datetime
-import os
-import subprocess
-import sys
-import time
-
-import psutil
-
-from psutil._compat import PY3, callable
-from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS
-from test_psutil import (get_test_subprocess, skip_on_access_denied,
- retry_before_failing, reap_children, sh, unittest,
- get_kernel_version, wait_for_pid)
-
-
-def ps(cmd):
- """Expects a ps command with a -o argument and parse the result
- returning only the value of interest.
- """
- if not LINUX:
- cmd = cmd.replace(" --no-headers ", " ")
- if SUNOS:
- cmd = cmd.replace("-o command", "-o comm")
- cmd = cmd.replace("-o start", "-o stime")
- p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE)
- output = p.communicate()[0].strip()
- if PY3:
- output = str(output, sys.stdout.encoding)
- if not LINUX:
- output = output.split('\n')[1].strip()
- try:
- return int(output)
- except ValueError:
- return output
-
-
-@unittest.skipUnless(POSIX, "not a POSIX system")
-class PosixSpecificTestCase(unittest.TestCase):
- """Compare psutil results against 'ps' command line utility."""
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess([PYTHON, "-E", "-O"],
- stdin=subprocess.PIPE).pid
- wait_for_pid(cls.pid)
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
-
- # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps
-
- def test_process_parent_pid(self):
- ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid)
- ppid_psutil = psutil.Process(self.pid).ppid()
- self.assertEqual(ppid_ps, ppid_psutil)
-
- def test_process_uid(self):
- uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid)
- uid_psutil = psutil.Process(self.pid).uids().real
- self.assertEqual(uid_ps, uid_psutil)
-
- def test_process_gid(self):
- gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid)
- gid_psutil = psutil.Process(self.pid).gids().real
- self.assertEqual(gid_ps, gid_psutil)
-
- def test_process_username(self):
- username_ps = ps("ps --no-headers -o user -p %s" % self.pid)
- username_psutil = psutil.Process(self.pid).username()
- self.assertEqual(username_ps, username_psutil)
-
- @skip_on_access_denied()
- @retry_before_failing()
- def test_process_rss_memory(self):
- # give python interpreter some time to properly initialize
- # so that the results are the same
- time.sleep(0.1)
- rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid)
- rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024
- self.assertEqual(rss_ps, rss_psutil)
-
- @skip_on_access_denied()
- @retry_before_failing()
- def test_process_vsz_memory(self):
- # give python interpreter some time to properly initialize
- # so that the results are the same
- time.sleep(0.1)
- vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid)
- vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024
- self.assertEqual(vsz_ps, vsz_psutil)
-
- def test_process_name(self):
- # use command + arg since "comm" keyword not supported on all platforms
- name_ps = ps("ps --no-headers -o command -p %s" % (
- self.pid)).split(' ')[0]
- # remove path if there is any, from the command
- name_ps = os.path.basename(name_ps).lower()
- name_psutil = psutil.Process(self.pid).name().lower()
- self.assertEqual(name_ps, name_psutil)
-
- @unittest.skipIf(OSX or BSD,
- 'ps -o start not available')
- def test_process_create_time(self):
- time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0]
- time_psutil = psutil.Process(self.pid).create_time()
- time_psutil_tstamp = datetime.datetime.fromtimestamp(
- time_psutil).strftime("%H:%M:%S")
- # sometimes ps shows the time rounded up instead of down, so we check
- # for both possible values
- round_time_psutil = round(time_psutil)
- round_time_psutil_tstamp = datetime.datetime.fromtimestamp(
- round_time_psutil).strftime("%H:%M:%S")
- self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp])
-
- def test_process_exe(self):
- ps_pathname = ps("ps --no-headers -o command -p %s" %
- self.pid).split(' ')[0]
- psutil_pathname = psutil.Process(self.pid).exe()
- try:
- self.assertEqual(ps_pathname, psutil_pathname)
- except AssertionError:
- # certain platforms such as BSD are more accurate returning:
- # "/usr/local/bin/python2.7"
- # ...instead of:
- # "/usr/local/bin/python"
- # We do not want to consider this difference in accuracy
- # an error.
- adjusted_ps_pathname = ps_pathname[:len(ps_pathname)]
- self.assertEqual(ps_pathname, adjusted_ps_pathname)
-
- def test_process_cmdline(self):
- ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid)
- psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline())
- if SUNOS:
- # ps on Solaris only shows the first part of the cmdline
- psutil_cmdline = psutil_cmdline.split(" ")[0]
- self.assertEqual(ps_cmdline, psutil_cmdline)
-
- @retry_before_failing()
- def test_pids(self):
- # Note: this test might fail if the OS is starting/killing
- # other processes in the meantime
- if SUNOS:
- cmd = ["ps", "ax"]
- else:
- cmd = ["ps", "ax", "-o", "pid"]
- p = get_test_subprocess(cmd, stdout=subprocess.PIPE)
- output = p.communicate()[0].strip()
- if PY3:
- output = str(output, sys.stdout.encoding)
- pids_ps = []
- for line in output.split('\n')[1:]:
- if line:
- pid = int(line.split()[0].strip())
- pids_ps.append(pid)
- # remove ps subprocess pid which is supposed to be dead in meantime
- pids_ps.remove(p.pid)
- pids_psutil = psutil.pids()
- pids_ps.sort()
- pids_psutil.sort()
-
- # on OSX ps doesn't show pid 0
- if OSX and 0 not in pids_ps:
- pids_ps.insert(0, 0)
-
- if pids_ps != pids_psutil:
- difference = [x for x in pids_psutil if x not in pids_ps] + \
- [x for x in pids_ps if x not in pids_psutil]
- self.fail("difference: " + str(difference))
-
- # for some reason ifconfig -a does not report all interfaces
- # returned by psutil
- @unittest.skipIf(SUNOS, "test not reliable on SUNOS")
- @unittest.skipIf(TRAVIS, "test not reliable on Travis")
- def test_nic_names(self):
- p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE)
- output = p.communicate()[0].strip()
- if PY3:
- output = str(output, sys.stdout.encoding)
- for nic in psutil.net_io_counters(pernic=True).keys():
- for line in output.split():
- if line.startswith(nic):
- break
- else:
- self.fail(
- "couldn't find %s nic in 'ifconfig -a' output\n%s" % (
- nic, output))
-
- @retry_before_failing()
- def test_users(self):
- out = sh("who")
- lines = out.split('\n')
- users = [x.split()[0] for x in lines]
- self.assertEqual(len(users), len(psutil.users()))
- terminals = [x.split()[1] for x in lines]
- for u in psutil.users():
- self.assertTrue(u.name in users, u.name)
- self.assertTrue(u.terminal in terminals, u.terminal)
-
- def test_fds_open(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
- def call(p, attr):
- args = ()
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- attr(*args)
- else:
- attr
-
- p = psutil.Process(os.getpid())
- failures = []
- ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice',
- 'send_signal', 'wait', 'children', 'as_dict']
- if LINUX and get_kernel_version() < (2, 6, 36):
- ignored_names.append('rlimit')
- if LINUX and get_kernel_version() < (2, 6, 23):
- ignored_names.append('num_ctx_switches')
- for name in dir(psutil.Process):
- if (name.startswith('_') or name in ignored_names):
- continue
- else:
- try:
- num1 = p.num_fds()
- for x in range(2):
- call(p, name)
- num2 = p.num_fds()
- except psutil.AccessDenied:
- pass
- else:
- if abs(num2 - num1) > 1:
- fail = "failure while processing Process.%s method " \
- "(before=%s, after=%s)" % (name, num1, num2)
- failures.append(fail)
- if failures:
- self.fail('\n' + '\n'.join(failures))
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/_sunos.py b/python/psutil/test/_sunos.py
deleted file mode 100644
index 3d54ccd8c..000000000
--- a/python/psutil/test/_sunos.py
+++ /dev/null
@@ -1,48 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Sun OS specific tests. These are implicitly run by test_psutil.py."""
-
-import sys
-import os
-
-from test_psutil import SUNOS, sh, unittest
-import psutil
-
-
-@unittest.skipUnless(SUNOS, "not a SunOS system")
-class SunOSSpecificTestCase(unittest.TestCase):
-
- def test_swap_memory(self):
- out = sh('env PATH=/usr/sbin:/sbin:%s swap -l -k' % os.environ['PATH'])
- lines = out.strip().split('\n')[1:]
- if not lines:
- raise ValueError('no swap device(s) configured')
- total = free = 0
- for line in lines:
- line = line.split()
- t, f = line[-2:]
- t = t.replace('K', '')
- f = f.replace('K', '')
- total += int(int(t) * 1024)
- free += int(int(f) * 1024)
- used = total - free
-
- psutil_swap = psutil.swap_memory()
- self.assertEqual(psutil_swap.total, total)
- self.assertEqual(psutil_swap.used, used)
- self.assertEqual(psutil_swap.free, free)
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(SunOSSpecificTestCase))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/_windows.py b/python/psutil/test/_windows.py
deleted file mode 100644
index b7477bfeb..000000000
--- a/python/psutil/test/_windows.py
+++ /dev/null
@@ -1,464 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Windows specific tests. These are implicitly run by test_psutil.py."""
-
-import errno
-import os
-import platform
-import signal
-import subprocess
-import sys
-import time
-import traceback
-
-from test_psutil import APPVEYOR, WINDOWS
-from test_psutil import get_test_subprocess, reap_children, unittest
-
-import mock
-try:
- import wmi
-except ImportError:
- wmi = None
-try:
- import win32api
- import win32con
-except ImportError:
- win32api = win32con = None
-
-from psutil._compat import PY3, callable, long
-import psutil
-
-
-cext = psutil._psplatform.cext
-
-
-def wrap_exceptions(fun):
- def wrapper(self, *args, **kwargs):
- try:
- return fun(self, *args, **kwargs)
- except OSError as err:
- from psutil._pswindows import ACCESS_DENIED_SET
- if err.errno in ACCESS_DENIED_SET:
- raise psutil.AccessDenied(None, None)
- if err.errno == errno.ESRCH:
- raise psutil.NoSuchProcess(None, None)
- raise
- return wrapper
-
-
-@unittest.skipUnless(WINDOWS, "not a Windows system")
-class WindowsSpecificTestCase(unittest.TestCase):
-
- @classmethod
- def setUpClass(cls):
- cls.pid = get_test_subprocess().pid
-
- @classmethod
- def tearDownClass(cls):
- reap_children()
-
- def test_issue_24(self):
- p = psutil.Process(0)
- self.assertRaises(psutil.AccessDenied, p.kill)
-
- def test_special_pid(self):
- p = psutil.Process(4)
- self.assertEqual(p.name(), 'System')
- # use __str__ to access all common Process properties to check
- # that nothing strange happens
- str(p)
- p.username()
- self.assertTrue(p.create_time() >= 0.0)
- try:
- rss, vms = p.memory_info()
- except psutil.AccessDenied:
- # expected on Windows Vista and Windows 7
- if not platform.uname()[1] in ('vista', 'win-7', 'win7'):
- raise
- else:
- self.assertTrue(rss > 0)
-
- def test_send_signal(self):
- p = psutil.Process(self.pid)
- self.assertRaises(ValueError, p.send_signal, signal.SIGINT)
-
- def test_nic_names(self):
- p = subprocess.Popen(['ipconfig', '/all'], stdout=subprocess.PIPE)
- out = p.communicate()[0]
- if PY3:
- out = str(out, sys.stdout.encoding)
- nics = psutil.net_io_counters(pernic=True).keys()
- for nic in nics:
- if "pseudo-interface" in nic.replace(' ', '-').lower():
- continue
- if nic not in out:
- self.fail(
- "%r nic wasn't found in 'ipconfig /all' output" % nic)
-
- def test_exe(self):
- for p in psutil.process_iter():
- try:
- self.assertEqual(os.path.basename(p.exe()), p.name())
- except psutil.Error:
- pass
-
- # --- Process class tests
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_name(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(p.name(), w.Caption)
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_exe(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- # Note: wmi reports the exe as a lower case string.
- # Being Windows paths case-insensitive we ignore that.
- self.assertEqual(p.exe().lower(), w.ExecutablePath.lower())
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_cmdline(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- self.assertEqual(' '.join(p.cmdline()),
- w.CommandLine.replace('"', ''))
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_username(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- domain, _, username = w.GetOwner()
- username = "%s\\%s" % (domain, username)
- self.assertEqual(p.username(), username)
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_rss_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- rss = p.memory_info().rss
- self.assertEqual(rss, int(w.WorkingSetSize))
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_vms_memory(self):
- time.sleep(0.1)
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- vms = p.memory_info().vms
- # http://msdn.microsoft.com/en-us/library/aa394372(VS.85).aspx
- # ...claims that PageFileUsage is represented in Kilo
- # bytes but funnily enough on certain platforms bytes are
- # returned instead.
- wmi_usage = int(w.PageFileUsage)
- if (vms != wmi_usage) and (vms != wmi_usage * 1024):
- self.fail("wmi=%s, psutil=%s" % (wmi_usage, vms))
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_process_create_time(self):
- w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- p = psutil.Process(self.pid)
- wmic_create = str(w.CreationDate.split('.')[0])
- psutil_create = time.strftime("%Y%m%d%H%M%S",
- time.localtime(p.create_time()))
- self.assertEqual(wmic_create, psutil_create)
-
- # --- psutil namespace functions and constants tests
-
- @unittest.skipUnless('NUMBER_OF_PROCESSORS' in os.environ,
- 'NUMBER_OF_PROCESSORS env var is not available')
- def test_cpu_count(self):
- num_cpus = int(os.environ['NUMBER_OF_PROCESSORS'])
- self.assertEqual(num_cpus, psutil.cpu_count())
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_total_phymem(self):
- w = wmi.WMI().Win32_ComputerSystem()[0]
- self.assertEqual(int(w.TotalPhysicalMemory),
- psutil.virtual_memory().total)
-
- # @unittest.skipIf(wmi is None, "wmi module is not installed")
- # def test__UPTIME(self):
- # # _UPTIME constant is not public but it is used internally
- # # as value to return for pid 0 creation time.
- # # WMI behaves the same.
- # w = wmi.WMI().Win32_Process(ProcessId=self.pid)[0]
- # p = psutil.Process(0)
- # wmic_create = str(w.CreationDate.split('.')[0])
- # psutil_create = time.strftime("%Y%m%d%H%M%S",
- # time.localtime(p.create_time()))
- #
-
- # Note: this test is not very reliable
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- @unittest.skipIf(APPVEYOR, "test not relieable on appveyor")
- def test_pids(self):
- # Note: this test might fail if the OS is starting/killing
- # other processes in the meantime
- w = wmi.WMI().Win32_Process()
- wmi_pids = set([x.ProcessId for x in w])
- psutil_pids = set(psutil.pids())
- self.assertEqual(wmi_pids, psutil_pids)
-
- @unittest.skipIf(wmi is None, "wmi module is not installed")
- def test_disks(self):
- ps_parts = psutil.disk_partitions(all=True)
- wmi_parts = wmi.WMI().Win32_LogicalDisk()
- for ps_part in ps_parts:
- for wmi_part in wmi_parts:
- if ps_part.device.replace('\\', '') == wmi_part.DeviceID:
- if not ps_part.mountpoint:
- # this is usually a CD-ROM with no disk inserted
- break
- try:
- usage = psutil.disk_usage(ps_part.mountpoint)
- except OSError as err:
- if err.errno == errno.ENOENT:
- # usually this is the floppy
- break
- else:
- raise
- self.assertEqual(usage.total, int(wmi_part.Size))
- wmi_free = int(wmi_part.FreeSpace)
- self.assertEqual(usage.free, wmi_free)
- # 10 MB tollerance
- if abs(usage.free - wmi_free) > 10 * 1024 * 1024:
- self.fail("psutil=%s, wmi=%s" % (
- usage.free, wmi_free))
- break
- else:
- self.fail("can't find partition %s" % repr(ps_part))
-
- @unittest.skipIf(win32api is None, "pywin32 module is not installed")
- def test_num_handles(self):
- p = psutil.Process(os.getpid())
- before = p.num_handles()
- handle = win32api.OpenProcess(win32con.PROCESS_QUERY_INFORMATION,
- win32con.FALSE, os.getpid())
- after = p.num_handles()
- self.assertEqual(after, before + 1)
- win32api.CloseHandle(handle)
- self.assertEqual(p.num_handles(), before)
-
- @unittest.skipIf(win32api is None, "pywin32 module is not installed")
- def test_num_handles_2(self):
- # Note: this fails from time to time; I'm keen on thinking
- # it doesn't mean something is broken
- def call(p, attr):
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- attr()
- else:
- attr
-
- p = psutil.Process(self.pid)
- failures = []
- for name in dir(psutil.Process):
- if name.startswith('_') \
- or name in ('terminate', 'kill', 'suspend', 'resume',
- 'nice', 'send_signal', 'wait', 'children',
- 'as_dict'):
- continue
- else:
- try:
- call(p, name)
- num1 = p.num_handles()
- call(p, name)
- num2 = p.num_handles()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- else:
- if num2 > num1:
- fail = \
- "failure while processing Process.%s method " \
- "(before=%s, after=%s)" % (name, num1, num2)
- failures.append(fail)
- if failures:
- self.fail('\n' + '\n'.join(failures))
-
- def test_name_always_available(self):
- # On Windows name() is never supposed to raise AccessDenied,
- # see https://github.com/giampaolo/psutil/issues/627
- for p in psutil.process_iter():
- try:
- p.name()
- except psutil.NoSuchProcess():
- pass
-
-
-@unittest.skipUnless(WINDOWS, "not a Windows system")
-class TestDualProcessImplementation(unittest.TestCase):
- """
- Certain APIs on Windows have 2 internal implementations, one
- based on documented Windows APIs, another one based
- NtQuerySystemInformation() which gets called as fallback in
- case the first fails because of limited permission error.
- Here we test that the two methods return the exact same value,
- see:
- https://github.com/giampaolo/psutil/issues/304
- """
-
- fun_names = [
- # function name, tolerance
- ('proc_cpu_times', 0.2),
- ('proc_create_time', 0.5),
- ('proc_num_handles', 1), # 1 because impl #1 opens a handle
- ('proc_memory_info', 1024), # KB
- ('proc_io_counters', 0),
- ]
-
- def test_compare_values(self):
- def assert_ge_0(obj):
- if isinstance(obj, tuple):
- for value in obj:
- self.assertGreaterEqual(value, 0, msg=obj)
- elif isinstance(obj, (int, long, float)):
- self.assertGreaterEqual(obj, 0)
- else:
- assert 0 # case not handled which needs to be fixed
-
- def compare_with_tolerance(ret1, ret2, tolerance):
- if ret1 == ret2:
- return
- else:
- if isinstance(ret2, (int, long, float)):
- diff = abs(ret1 - ret2)
- self.assertLessEqual(diff, tolerance)
- elif isinstance(ret2, tuple):
- for a, b in zip(ret1, ret2):
- diff = abs(a - b)
- self.assertLessEqual(diff, tolerance)
-
- from psutil._pswindows import ntpinfo
- failures = []
- for p in psutil.process_iter():
- try:
- nt = ntpinfo(*cext.proc_info(p.pid))
- except psutil.NoSuchProcess:
- continue
- assert_ge_0(nt)
-
- for name, tolerance in self.fun_names:
- if name == 'proc_memory_info' and p.pid == os.getpid():
- continue
- if name == 'proc_create_time' and p.pid in (0, 4):
- continue
- meth = wrap_exceptions(getattr(cext, name))
- try:
- ret = meth(p.pid)
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- continue
- # compare values
- try:
- if name == 'proc_cpu_times':
- compare_with_tolerance(ret[0], nt.user_time, tolerance)
- compare_with_tolerance(ret[1],
- nt.kernel_time, tolerance)
- elif name == 'proc_create_time':
- compare_with_tolerance(ret, nt.create_time, tolerance)
- elif name == 'proc_num_handles':
- compare_with_tolerance(ret, nt.num_handles, tolerance)
- elif name == 'proc_io_counters':
- compare_with_tolerance(ret[0], nt.io_rcount, tolerance)
- compare_with_tolerance(ret[1], nt.io_wcount, tolerance)
- compare_with_tolerance(ret[2], nt.io_rbytes, tolerance)
- compare_with_tolerance(ret[3], nt.io_wbytes, tolerance)
- elif name == 'proc_memory_info':
- try:
- rawtupl = cext.proc_memory_info_2(p.pid)
- except psutil.NoSuchProcess:
- continue
- compare_with_tolerance(ret, rawtupl, tolerance)
- except AssertionError:
- trace = traceback.format_exc()
- msg = '%s\npid=%s, method=%r, ret_1=%r, ret_2=%r' % (
- trace, p.pid, name, ret, nt)
- failures.append(msg)
- break
-
- if failures:
- self.fail('\n\n'.join(failures))
-
- # ---
- # same tests as above but mimicks the AccessDenied failure of
- # the first (fast) method failing with AD.
- # TODO: currently does not take tolerance into account.
-
- def test_name(self):
- name = psutil.Process().name()
- with mock.patch("psutil._psplatform.cext.proc_exe",
- side_effect=psutil.AccessDenied(os.getpid())) as fun:
- psutil.Process().name() == name
- assert fun.called
-
- def test_memory_info(self):
- mem = psutil.Process().memory_info()
- with mock.patch("psutil._psplatform.cext.proc_memory_info",
- side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().memory_info() == mem
- assert fun.called
-
- def test_create_time(self):
- ctime = psutil.Process().create_time()
- with mock.patch("psutil._psplatform.cext.proc_create_time",
- side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().create_time() == ctime
- assert fun.called
-
- def test_cpu_times(self):
- cpu_times = psutil.Process().cpu_times()
- with mock.patch("psutil._psplatform.cext.proc_cpu_times",
- side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().cpu_times() == cpu_times
- assert fun.called
-
- def test_io_counters(self):
- io_counters = psutil.Process().io_counters()
- with mock.patch("psutil._psplatform.cext.proc_io_counters",
- side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().io_counters() == io_counters
- assert fun.called
-
- def test_num_handles(self):
- io_counters = psutil.Process().io_counters()
- with mock.patch("psutil._psplatform.cext.proc_io_counters",
- side_effect=OSError(errno.EPERM, "msg")) as fun:
- psutil.Process().io_counters() == io_counters
- assert fun.called
-
- # --- other tests
-
- def test_compare_name_exe(self):
- for p in psutil.process_iter():
- try:
- a = os.path.basename(p.exe())
- b = p.name()
- except (psutil.NoSuchProcess, psutil.AccessDenied):
- pass
- else:
- self.assertEqual(a, b)
-
- def test_zombies(self):
- # test that NPS is raised by the 2nd implementation in case a
- # process no longer exists
- ZOMBIE_PID = max(psutil.pids()) + 5000
- for name, _ in self.fun_names:
- meth = wrap_exceptions(getattr(cext, name))
- self.assertRaises(psutil.NoSuchProcess, meth, ZOMBIE_PID)
-
-
-def main():
- test_suite = unittest.TestSuite()
- test_suite.addTest(unittest.makeSuite(WindowsSpecificTestCase))
- test_suite.addTest(unittest.makeSuite(TestDualProcessImplementation))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/test_memory_leaks.py b/python/psutil/test/test_memory_leaks.py
deleted file mode 100644
index 6f02dc0ac..000000000
--- a/python/psutil/test/test_memory_leaks.py
+++ /dev/null
@@ -1,445 +0,0 @@
-#!/usr/bin/env python
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""
-A test script which attempts to detect memory leaks by calling C
-functions many times and compare process memory usage before and
-after the calls. It might produce false positives.
-"""
-
-import functools
-import gc
-import os
-import socket
-import sys
-import threading
-import time
-
-import psutil
-import psutil._common
-
-from psutil._compat import xrange, callable
-from test_psutil import (WINDOWS, POSIX, OSX, LINUX, SUNOS, BSD, TESTFN,
- RLIMIT_SUPPORT, TRAVIS)
-from test_psutil import (reap_children, supports_ipv6, safe_remove,
- get_test_subprocess)
-
-if sys.version_info < (2, 7):
- import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
-else:
- import unittest
-
-
-LOOPS = 1000
-TOLERANCE = 4096
-SKIP_PYTHON_IMPL = True
-
-
-def skip_if_linux():
- return unittest.skipIf(LINUX and SKIP_PYTHON_IMPL,
- "not worth being tested on LINUX (pure python)")
-
-
-class Base(unittest.TestCase):
- proc = psutil.Process()
-
- def execute(self, function, *args, **kwargs):
- def call_many_times():
- for x in xrange(LOOPS - 1):
- self.call(function, *args, **kwargs)
- del x
- gc.collect()
- return self.get_mem()
-
- self.call(function, *args, **kwargs)
- self.assertEqual(gc.garbage, [])
- self.assertEqual(threading.active_count(), 1)
-
- # RSS comparison
- # step 1
- rss1 = call_many_times()
- # step 2
- rss2 = call_many_times()
-
- difference = rss2 - rss1
- if difference > TOLERANCE:
- # This doesn't necessarily mean we have a leak yet.
- # At this point we assume that after having called the
- # function so many times the memory usage is stabilized
- # and if there are no leaks it should not increase any
- # more.
- # Let's keep calling fun for 3 more seconds and fail if
- # we notice any difference.
- stop_at = time.time() + 3
- while True:
- self.call(function, *args, **kwargs)
- if time.time() >= stop_at:
- break
- del stop_at
- gc.collect()
- rss3 = self.get_mem()
- difference = rss3 - rss2
- if rss3 > rss2:
- self.fail("rss2=%s, rss3=%s, difference=%s"
- % (rss2, rss3, difference))
-
- def execute_w_exc(self, exc, function, *args, **kwargs):
- kwargs['_exc'] = exc
- self.execute(function, *args, **kwargs)
-
- def get_mem(self):
- return psutil.Process().memory_info()[0]
-
- def call(self, function, *args, **kwargs):
- raise NotImplementedError("must be implemented in subclass")
-
-
-class TestProcessObjectLeaks(Base):
- """Test leaks of Process class methods and properties"""
-
- def setUp(self):
- gc.collect()
-
- def tearDown(self):
- reap_children()
-
- def call(self, function, *args, **kwargs):
- if callable(function):
- if '_exc' in kwargs:
- exc = kwargs.pop('_exc')
- self.assertRaises(exc, function, *args, **kwargs)
- else:
- try:
- function(*args, **kwargs)
- except psutil.Error:
- pass
- else:
- meth = getattr(self.proc, function)
- if '_exc' in kwargs:
- exc = kwargs.pop('_exc')
- self.assertRaises(exc, meth, *args, **kwargs)
- else:
- try:
- meth(*args, **kwargs)
- except psutil.Error:
- pass
-
- @skip_if_linux()
- def test_name(self):
- self.execute('name')
-
- @skip_if_linux()
- def test_cmdline(self):
- self.execute('cmdline')
-
- @skip_if_linux()
- def test_exe(self):
- self.execute('exe')
-
- @skip_if_linux()
- def test_ppid(self):
- self.execute('ppid')
-
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_uids(self):
- self.execute('uids')
-
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_gids(self):
- self.execute('gids')
-
- @skip_if_linux()
- def test_status(self):
- self.execute('status')
-
- def test_nice_get(self):
- self.execute('nice')
-
- def test_nice_set(self):
- niceness = psutil.Process().nice()
- self.execute('nice', niceness)
-
- @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
- "Linux and Windows Vista only")
- def test_ionice_get(self):
- self.execute('ionice')
-
- @unittest.skipUnless(hasattr(psutil.Process, 'ionice'),
- "Linux and Windows Vista only")
- def test_ionice_set(self):
- if WINDOWS:
- value = psutil.Process().ionice()
- self.execute('ionice', value)
- else:
- from psutil._pslinux import cext
- self.execute('ionice', psutil.IOPRIO_CLASS_NONE)
- fun = functools.partial(cext.proc_ioprio_set, os.getpid(), -1, 0)
- self.execute_w_exc(OSError, fun)
-
- @unittest.skipIf(OSX or SUNOS, "feature not supported on this platform")
- @skip_if_linux()
- def test_io_counters(self):
- self.execute('io_counters')
-
- @unittest.skipUnless(WINDOWS, "not worth being tested on posix")
- def test_username(self):
- self.execute('username')
-
- @skip_if_linux()
- def test_create_time(self):
- self.execute('create_time')
-
- @skip_if_linux()
- def test_num_threads(self):
- self.execute('num_threads')
-
- @unittest.skipUnless(WINDOWS, "Windows only")
- def test_num_handles(self):
- self.execute('num_handles')
-
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_num_fds(self):
- self.execute('num_fds')
-
- @skip_if_linux()
- def test_threads(self):
- self.execute('threads')
-
- @skip_if_linux()
- def test_cpu_times(self):
- self.execute('cpu_times')
-
- @skip_if_linux()
- def test_memory_info(self):
- self.execute('memory_info')
-
- @skip_if_linux()
- def test_memory_info_ex(self):
- self.execute('memory_info_ex')
-
- @unittest.skipUnless(POSIX, "POSIX only")
- @skip_if_linux()
- def test_terminal(self):
- self.execute('terminal')
-
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_resume(self):
- self.execute('resume')
-
- @skip_if_linux()
- def test_cwd(self):
- self.execute('cwd')
-
- @unittest.skipUnless(WINDOWS or LINUX or BSD,
- "Windows or Linux or BSD only")
- def test_cpu_affinity_get(self):
- self.execute('cpu_affinity')
-
- @unittest.skipUnless(WINDOWS or LINUX or BSD,
- "Windows or Linux or BSD only")
- def test_cpu_affinity_set(self):
- affinity = psutil.Process().cpu_affinity()
- self.execute('cpu_affinity', affinity)
- if not TRAVIS:
- self.execute_w_exc(ValueError, 'cpu_affinity', [-1])
-
- @skip_if_linux()
- def test_open_files(self):
- safe_remove(TESTFN) # needed after UNIX socket test has run
- with open(TESTFN, 'w'):
- self.execute('open_files')
-
- # OSX implementation is unbelievably slow
- @unittest.skipIf(OSX, "OSX implementation is too slow")
- @skip_if_linux()
- def test_memory_maps(self):
- self.execute('memory_maps')
-
- @unittest.skipUnless(LINUX, "Linux only")
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_get(self):
- self.execute('rlimit', psutil.RLIMIT_NOFILE)
-
- @unittest.skipUnless(LINUX, "Linux only")
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_set(self):
- limit = psutil.Process().rlimit(psutil.RLIMIT_NOFILE)
- self.execute('rlimit', psutil.RLIMIT_NOFILE, limit)
- self.execute_w_exc(OSError, 'rlimit', -1)
-
- @skip_if_linux()
- # Windows implementation is based on a single system-wide function
- @unittest.skipIf(WINDOWS, "tested later")
- def test_connections(self):
- def create_socket(family, type):
- sock = socket.socket(family, type)
- sock.bind(('', 0))
- if type == socket.SOCK_STREAM:
- sock.listen(1)
- return sock
-
- socks = []
- socks.append(create_socket(socket.AF_INET, socket.SOCK_STREAM))
- socks.append(create_socket(socket.AF_INET, socket.SOCK_DGRAM))
- if supports_ipv6():
- socks.append(create_socket(socket.AF_INET6, socket.SOCK_STREAM))
- socks.append(create_socket(socket.AF_INET6, socket.SOCK_DGRAM))
- if hasattr(socket, 'AF_UNIX'):
- safe_remove(TESTFN)
- s = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
- s.bind(TESTFN)
- s.listen(1)
- socks.append(s)
- kind = 'all'
- # TODO: UNIX sockets are temporarily implemented by parsing
- # 'pfiles' cmd output; we don't want that part of the code to
- # be executed.
- if SUNOS:
- kind = 'inet'
- try:
- self.execute('connections', kind=kind)
- finally:
- for s in socks:
- s.close()
-
-
-p = get_test_subprocess()
-DEAD_PROC = psutil.Process(p.pid)
-DEAD_PROC.kill()
-DEAD_PROC.wait()
-del p
-
-
-class TestProcessObjectLeaksZombie(TestProcessObjectLeaks):
- """Same as above but looks for leaks occurring when dealing with
- zombie processes raising NoSuchProcess exception.
- """
- proc = DEAD_PROC
-
- def call(self, *args, **kwargs):
- try:
- TestProcessObjectLeaks.call(self, *args, **kwargs)
- except psutil.NoSuchProcess:
- pass
-
- if not POSIX:
- def test_kill(self):
- self.execute('kill')
-
- def test_terminate(self):
- self.execute('terminate')
-
- def test_suspend(self):
- self.execute('suspend')
-
- def test_resume(self):
- self.execute('resume')
-
- def test_wait(self):
- self.execute('wait')
-
-
-class TestModuleFunctionsLeaks(Base):
- """Test leaks of psutil module functions."""
-
- def setUp(self):
- gc.collect()
-
- def call(self, function, *args, **kwargs):
- fun = getattr(psutil, function)
- fun(*args, **kwargs)
-
- @skip_if_linux()
- def test_cpu_count_logical(self):
- psutil.cpu_count = psutil._psplatform.cpu_count_logical
- self.execute('cpu_count')
-
- @skip_if_linux()
- def test_cpu_count_physical(self):
- psutil.cpu_count = psutil._psplatform.cpu_count_physical
- self.execute('cpu_count')
-
- @skip_if_linux()
- def test_boot_time(self):
- self.execute('boot_time')
-
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_pid_exists(self):
- self.execute('pid_exists', os.getpid())
-
- def test_virtual_memory(self):
- self.execute('virtual_memory')
-
- # TODO: remove this skip when this gets fixed
- @unittest.skipIf(SUNOS,
- "not worth being tested on SUNOS (uses a subprocess)")
- def test_swap_memory(self):
- self.execute('swap_memory')
-
- @skip_if_linux()
- def test_cpu_times(self):
- self.execute('cpu_times')
-
- @skip_if_linux()
- def test_per_cpu_times(self):
- self.execute('cpu_times', percpu=True)
-
- @unittest.skipIf(POSIX and SKIP_PYTHON_IMPL,
- "not worth being tested on POSIX (pure python)")
- def test_disk_usage(self):
- self.execute('disk_usage', '.')
-
- def test_disk_partitions(self):
- self.execute('disk_partitions')
-
- @skip_if_linux()
- def test_net_io_counters(self):
- self.execute('net_io_counters')
-
- @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
- '/proc/diskstats not available on this Linux version')
- @skip_if_linux()
- def test_disk_io_counters(self):
- self.execute('disk_io_counters')
-
- # XXX - on Windows this produces a false positive
- @unittest.skipIf(WINDOWS, "XXX produces a false positive on Windows")
- def test_users(self):
- self.execute('users')
-
- @unittest.skipIf(LINUX,
- "not worth being tested on Linux (pure python)")
- def test_net_connections(self):
- self.execute('net_connections')
-
- def test_net_if_addrs(self):
- self.execute('net_if_addrs')
-
- @unittest.skipIf(TRAVIS, "EPERM on travis")
- def test_net_if_stats(self):
- self.execute('net_if_stats')
-
-
-def main():
- test_suite = unittest.TestSuite()
- tests = [TestProcessObjectLeaksZombie,
- TestProcessObjectLeaks,
- TestModuleFunctionsLeaks]
- for test in tests:
- test_suite.addTest(unittest.makeSuite(test))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)
diff --git a/python/psutil/test/test_psutil.py b/python/psutil/test/test_psutil.py
deleted file mode 100644
index 3b2e3587a..000000000
--- a/python/psutil/test/test_psutil.py
+++ /dev/null
@@ -1,3013 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-
-# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""
-psutil test suite. Run it with:
-$ make test
-
-If you're on Python < 2.7 unittest2 module must be installed first:
-https://pypi.python.org/pypi/unittest2
-"""
-
-from __future__ import division
-
-import ast
-import atexit
-import collections
-import contextlib
-import datetime
-import errno
-import functools
-import imp
-import json
-import os
-import pickle
-import pprint
-import re
-import select
-import shutil
-import signal
-import socket
-import stat
-import subprocess
-import sys
-import tempfile
-import textwrap
-import threading
-import time
-import traceback
-import types
-import warnings
-from socket import AF_INET, SOCK_STREAM, SOCK_DGRAM
-try:
- import ipaddress # python >= 3.3
-except ImportError:
- ipaddress = None
-try:
- from unittest import mock # py3
-except ImportError:
- import mock # requires "pip install mock"
-
-import psutil
-from psutil._compat import PY3, callable, long, unicode
-
-if sys.version_info < (2, 7):
- import unittest2 as unittest # https://pypi.python.org/pypi/unittest2
-else:
- import unittest
-if sys.version_info >= (3, 4):
- import enum
-else:
- enum = None
-
-
-# ===================================================================
-# --- Constants
-# ===================================================================
-
-# conf for retry_before_failing() decorator
-NO_RETRIES = 10
-# bytes tolerance for OS memory related tests
-TOLERANCE = 500 * 1024 # 500KB
-# the timeout used in functions which have to wait
-GLOBAL_TIMEOUT = 3
-
-AF_INET6 = getattr(socket, "AF_INET6")
-AF_UNIX = getattr(socket, "AF_UNIX", None)
-PYTHON = os.path.realpath(sys.executable)
-DEVNULL = open(os.devnull, 'r+')
-TESTFN = os.path.join(os.getcwd(), "$testfile")
-TESTFN_UNICODE = TESTFN + "Æ’Å‘Å‘"
-TESTFILE_PREFIX = 'psutil-test-suite-'
-if not PY3:
- try:
- TESTFN_UNICODE = unicode(TESTFN_UNICODE, sys.getfilesystemencoding())
- except UnicodeDecodeError:
- TESTFN_UNICODE = TESTFN + "???"
-
-EXAMPLES_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__),
- '..', 'examples'))
-
-POSIX = os.name == 'posix'
-WINDOWS = os.name == 'nt'
-if WINDOWS:
- WIN_VISTA = (6, 0, 0)
-LINUX = sys.platform.startswith("linux")
-OSX = sys.platform.startswith("darwin")
-BSD = sys.platform.startswith("freebsd")
-SUNOS = sys.platform.startswith("sunos")
-VALID_PROC_STATUSES = [getattr(psutil, x) for x in dir(psutil)
- if x.startswith('STATUS_')]
-# whether we're running this test suite on Travis (https://travis-ci.org/)
-TRAVIS = bool(os.environ.get('TRAVIS'))
-# whether we're running this test suite on Appveyor for Windows
-# (http://www.appveyor.com/)
-APPVEYOR = bool(os.environ.get('APPVEYOR'))
-
-if TRAVIS or 'tox' in sys.argv[0]:
- import ipaddress
-if TRAVIS or APPVEYOR:
- GLOBAL_TIMEOUT = GLOBAL_TIMEOUT * 4
-
-
-# ===================================================================
-# --- Utility functions
-# ===================================================================
-
-def cleanup():
- reap_children(search_all=True)
- safe_remove(TESTFN)
- try:
- safe_rmdir(TESTFN_UNICODE)
- except UnicodeEncodeError:
- pass
- for path in _testfiles:
- safe_remove(path)
-
-atexit.register(cleanup)
-atexit.register(lambda: DEVNULL.close())
-
-
-_subprocesses_started = set()
-
-
-def get_test_subprocess(cmd=None, stdout=DEVNULL, stderr=DEVNULL,
- stdin=DEVNULL, wait=False):
- """Return a subprocess.Popen object to use in tests.
- By default stdout and stderr are redirected to /dev/null and the
- python interpreter is used as test process.
- If 'wait' is True attemps to make sure the process is in a
- reasonably initialized state.
- """
- if cmd is None:
- pyline = ""
- if wait:
- pyline += "open(r'%s', 'w'); " % TESTFN
- pyline += "import time; time.sleep(60);"
- cmd_ = [PYTHON, "-c", pyline]
- else:
- cmd_ = cmd
- sproc = subprocess.Popen(cmd_, stdout=stdout, stderr=stderr, stdin=stdin)
- if wait:
- if cmd is None:
- stop_at = time.time() + 3
- while stop_at > time.time():
- if os.path.exists(TESTFN):
- break
- time.sleep(0.001)
- else:
- warn("couldn't make sure test file was actually created")
- else:
- wait_for_pid(sproc.pid)
- _subprocesses_started.add(psutil.Process(sproc.pid))
- return sproc
-
-
-_testfiles = []
-
-
-def pyrun(src):
- """Run python code 'src' in a separate interpreter.
- Return interpreter subprocess.
- """
- if PY3:
- src = bytes(src, 'ascii')
- with tempfile.NamedTemporaryFile(
- prefix=TESTFILE_PREFIX, delete=False) as f:
- _testfiles.append(f.name)
- f.write(src)
- f.flush()
- subp = get_test_subprocess([PYTHON, f.name], stdout=None,
- stderr=None)
- wait_for_pid(subp.pid)
- return subp
-
-
-def warn(msg):
- """Raise a warning msg."""
- warnings.warn(msg, UserWarning)
-
-
-def sh(cmdline, stdout=subprocess.PIPE, stderr=subprocess.PIPE):
- """run cmd in a subprocess and return its output.
- raises RuntimeError on error.
- """
- p = subprocess.Popen(cmdline, shell=True, stdout=stdout, stderr=stderr)
- stdout, stderr = p.communicate()
- if p.returncode != 0:
- raise RuntimeError(stderr)
- if stderr:
- warn(stderr)
- if PY3:
- stdout = str(stdout, sys.stdout.encoding)
- return stdout.strip()
-
-
-def which(program):
- """Same as UNIX which command. Return None on command not found."""
- def is_exe(fpath):
- return os.path.isfile(fpath) and os.access(fpath, os.X_OK)
-
- fpath, fname = os.path.split(program)
- if fpath:
- if is_exe(program):
- return program
- else:
- for path in os.environ["PATH"].split(os.pathsep):
- exe_file = os.path.join(path, program)
- if is_exe(exe_file):
- return exe_file
- return None
-
-
-if POSIX:
- def get_kernel_version():
- """Return a tuple such as (2, 6, 36)."""
- s = ""
- uname = os.uname()[2]
- for c in uname:
- if c.isdigit() or c == '.':
- s += c
- else:
- break
- if not s:
- raise ValueError("can't parse %r" % uname)
- minor = 0
- micro = 0
- nums = s.split('.')
- major = int(nums[0])
- if len(nums) >= 2:
- minor = int(nums[1])
- if len(nums) >= 3:
- micro = int(nums[2])
- return (major, minor, micro)
-
-
-if LINUX:
- RLIMIT_SUPPORT = get_kernel_version() >= (2, 6, 36)
-else:
- RLIMIT_SUPPORT = False
-
-
-def wait_for_pid(pid, timeout=GLOBAL_TIMEOUT):
- """Wait for pid to show up in the process list then return.
- Used in the test suite to give time the sub process to initialize.
- """
- raise_at = time.time() + timeout
- while True:
- if pid in psutil.pids():
- # give it one more iteration to allow full initialization
- time.sleep(0.01)
- return
- time.sleep(0.0001)
- if time.time() >= raise_at:
- raise RuntimeError("Timed out")
-
-
-def wait_for_file(fname, timeout=GLOBAL_TIMEOUT, delete_file=True):
- """Wait for a file to be written on disk."""
- stop_at = time.time() + 3
- while time.time() < stop_at:
- try:
- with open(fname, "r") as f:
- data = f.read()
- if not data:
- continue
- if delete_file:
- os.remove(fname)
- return data
- except IOError:
- time.sleep(0.001)
- raise RuntimeError("timed out (couldn't read file)")
-
-
-def reap_children(search_all=False):
- """Kill any subprocess started by this test suite and ensure that
- no zombies stick around to hog resources and create problems when
- looking for refleaks.
- """
- global _subprocesses_started
- procs = _subprocesses_started.copy()
- if search_all:
- this_process = psutil.Process()
- for p in this_process.children(recursive=True):
- procs.add(p)
- for p in procs:
- try:
- p.terminate()
- except psutil.NoSuchProcess:
- pass
- gone, alive = psutil.wait_procs(procs, timeout=GLOBAL_TIMEOUT)
- for p in alive:
- warn("couldn't terminate process %s" % p)
- try:
- p.kill()
- except psutil.NoSuchProcess:
- pass
- _, alive = psutil.wait_procs(alive, timeout=GLOBAL_TIMEOUT)
- if alive:
- warn("couldn't not kill processes %s" % str(alive))
- _subprocesses_started = set(alive)
-
-
-def check_ip_address(addr, family):
- """Attempts to check IP address's validity."""
- if enum and PY3:
- assert isinstance(family, enum.IntEnum), family
- if family == AF_INET:
- octs = [int(x) for x in addr.split('.')]
- assert len(octs) == 4, addr
- for num in octs:
- assert 0 <= num <= 255, addr
- if ipaddress:
- if not PY3:
- addr = unicode(addr)
- ipaddress.IPv4Address(addr)
- elif family == AF_INET6:
- assert isinstance(addr, str), addr
- if ipaddress:
- if not PY3:
- addr = unicode(addr)
- ipaddress.IPv6Address(addr)
- elif family == psutil.AF_LINK:
- assert re.match('([a-fA-F0-9]{2}[:|\-]?){6}', addr) is not None, addr
- else:
- raise ValueError("unknown family %r", family)
-
-
-def check_connection_ntuple(conn):
- """Check validity of a connection namedtuple."""
- valid_conn_states = [getattr(psutil, x) for x in dir(psutil) if
- x.startswith('CONN_')]
- assert conn[0] == conn.fd
- assert conn[1] == conn.family
- assert conn[2] == conn.type
- assert conn[3] == conn.laddr
- assert conn[4] == conn.raddr
- assert conn[5] == conn.status
- assert conn.type in (SOCK_STREAM, SOCK_DGRAM), repr(conn.type)
- assert conn.family in (AF_INET, AF_INET6, AF_UNIX), repr(conn.family)
- assert conn.status in valid_conn_states, conn.status
-
- # check IP address and port sanity
- for addr in (conn.laddr, conn.raddr):
- if not addr:
- continue
- if conn.family in (AF_INET, AF_INET6):
- assert isinstance(addr, tuple), addr
- ip, port = addr
- assert isinstance(port, int), port
- assert 0 <= port <= 65535, port
- check_ip_address(ip, conn.family)
- elif conn.family == AF_UNIX:
- assert isinstance(addr, (str, None)), addr
- else:
- raise ValueError("unknown family %r", conn.family)
-
- if conn.family in (AF_INET, AF_INET6):
- # actually try to bind the local socket; ignore IPv6
- # sockets as their address might be represented as
- # an IPv4-mapped-address (e.g. "::127.0.0.1")
- # and that's rejected by bind()
- if conn.family == AF_INET:
- s = socket.socket(conn.family, conn.type)
- with contextlib.closing(s):
- try:
- s.bind((conn.laddr[0], 0))
- except socket.error as err:
- if err.errno != errno.EADDRNOTAVAIL:
- raise
- elif conn.family == AF_UNIX:
- assert not conn.raddr, repr(conn.raddr)
- assert conn.status == psutil.CONN_NONE, conn.status
-
- if getattr(conn, 'fd', -1) != -1:
- assert conn.fd > 0, conn
- if hasattr(socket, 'fromfd') and not WINDOWS:
- try:
- dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
- except (socket.error, OSError) as err:
- if err.args[0] != errno.EBADF:
- raise
- else:
- with contextlib.closing(dupsock):
- assert dupsock.family == conn.family
- assert dupsock.type == conn.type
-
-
-def safe_remove(file):
- "Convenience function for removing temporary test files"
- try:
- os.remove(file)
- except OSError as err:
- if err.errno != errno.ENOENT:
- # file is being used by another process
- if WINDOWS and isinstance(err, WindowsError) and err.errno == 13:
- return
- raise
-
-
-def safe_rmdir(dir):
- "Convenience function for removing temporary test directories"
- try:
- os.rmdir(dir)
- except OSError as err:
- if err.errno != errno.ENOENT:
- raise
-
-
-def call_until(fun, expr, timeout=GLOBAL_TIMEOUT):
- """Keep calling function for timeout secs and exit if eval()
- expression is True.
- """
- stop_at = time.time() + timeout
- while time.time() < stop_at:
- ret = fun()
- if eval(expr):
- return ret
- time.sleep(0.001)
- raise RuntimeError('timed out (ret=%r)' % ret)
-
-
-def retry_before_failing(ntimes=None):
- """Decorator which runs a test function and retries N times before
- actually failing.
- """
- def decorator(fun):
- @functools.wraps(fun)
- def wrapper(*args, **kwargs):
- for x in range(ntimes or NO_RETRIES):
- try:
- return fun(*args, **kwargs)
- except AssertionError:
- pass
- raise
- return wrapper
- return decorator
-
-
-def skip_on_access_denied(only_if=None):
- """Decorator to Ignore AccessDenied exceptions."""
- def decorator(fun):
- @functools.wraps(fun)
- def wrapper(*args, **kwargs):
- try:
- return fun(*args, **kwargs)
- except psutil.AccessDenied:
- if only_if is not None:
- if not only_if:
- raise
- msg = "%r was skipped because it raised AccessDenied" \
- % fun.__name__
- raise unittest.SkipTest(msg)
- return wrapper
- return decorator
-
-
-def skip_on_not_implemented(only_if=None):
- """Decorator to Ignore NotImplementedError exceptions."""
- def decorator(fun):
- @functools.wraps(fun)
- def wrapper(*args, **kwargs):
- try:
- return fun(*args, **kwargs)
- except NotImplementedError:
- if only_if is not None:
- if not only_if:
- raise
- msg = "%r was skipped because it raised NotImplementedError" \
- % fun.__name__
- raise unittest.SkipTest(msg)
- return wrapper
- return decorator
-
-
-def supports_ipv6():
- """Return True if IPv6 is supported on this platform."""
- if not socket.has_ipv6 or not hasattr(socket, "AF_INET6"):
- return False
- sock = None
- try:
- sock = socket.socket(AF_INET6, SOCK_STREAM)
- sock.bind(("::1", 0))
- except (socket.error, socket.gaierror):
- return False
- else:
- return True
- finally:
- if sock is not None:
- sock.close()
-
-
-if WINDOWS:
- def get_winver():
- wv = sys.getwindowsversion()
- if hasattr(wv, 'service_pack_major'): # python >= 2.7
- sp = wv.service_pack_major or 0
- else:
- r = re.search("\s\d$", wv[4])
- if r:
- sp = int(r.group(0))
- else:
- sp = 0
- return (wv[0], wv[1], sp)
-
-
-class ThreadTask(threading.Thread):
- """A thread object used for running process thread tests."""
-
- def __init__(self):
- threading.Thread.__init__(self)
- self._running = False
- self._interval = None
- self._flag = threading.Event()
-
- def __repr__(self):
- name = self.__class__.__name__
- return '<%s running=%s at %#x>' % (name, self._running, id(self))
-
- def start(self, interval=0.001):
- """Start thread and keep it running until an explicit
- stop() request. Polls for shutdown every 'timeout' seconds.
- """
- if self._running:
- raise ValueError("already started")
- self._interval = interval
- threading.Thread.start(self)
- self._flag.wait()
-
- def run(self):
- self._running = True
- self._flag.set()
- while self._running:
- time.sleep(self._interval)
-
- def stop(self):
- """Stop thread execution and and waits until it is stopped."""
- if not self._running:
- raise ValueError("already stopped")
- self._running = False
- self.join()
-
-
-# ===================================================================
-# --- System-related API tests
-# ===================================================================
-
-class TestSystemAPIs(unittest.TestCase):
- """Tests for system-related APIs."""
-
- def setUp(self):
- safe_remove(TESTFN)
-
- def tearDown(self):
- reap_children()
-
- def test_process_iter(self):
- self.assertIn(os.getpid(), [x.pid for x in psutil.process_iter()])
- sproc = get_test_subprocess()
- self.assertIn(sproc.pid, [x.pid for x in psutil.process_iter()])
- p = psutil.Process(sproc.pid)
- p.kill()
- p.wait()
- self.assertNotIn(sproc.pid, [x.pid for x in psutil.process_iter()])
-
- def test_wait_procs(self):
- def callback(p):
- l.append(p.pid)
-
- l = []
- sproc1 = get_test_subprocess()
- sproc2 = get_test_subprocess()
- sproc3 = get_test_subprocess()
- procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
- self.assertRaises(ValueError, psutil.wait_procs, procs, timeout=-1)
- self.assertRaises(TypeError, psutil.wait_procs, procs, callback=1)
- t = time.time()
- gone, alive = psutil.wait_procs(procs, timeout=0.01, callback=callback)
-
- self.assertLess(time.time() - t, 0.5)
- self.assertEqual(gone, [])
- self.assertEqual(len(alive), 3)
- self.assertEqual(l, [])
- for p in alive:
- self.assertFalse(hasattr(p, 'returncode'))
-
- @retry_before_failing(30)
- def test(procs, callback):
- gone, alive = psutil.wait_procs(procs, timeout=0.03,
- callback=callback)
- self.assertEqual(len(gone), 1)
- self.assertEqual(len(alive), 2)
- return gone, alive
-
- sproc3.terminate()
- gone, alive = test(procs, callback)
- self.assertIn(sproc3.pid, [x.pid for x in gone])
- if POSIX:
- self.assertEqual(gone.pop().returncode, signal.SIGTERM)
- else:
- self.assertEqual(gone.pop().returncode, 1)
- self.assertEqual(l, [sproc3.pid])
- for p in alive:
- self.assertFalse(hasattr(p, 'returncode'))
-
- @retry_before_failing(30)
- def test(procs, callback):
- gone, alive = psutil.wait_procs(procs, timeout=0.03,
- callback=callback)
- self.assertEqual(len(gone), 3)
- self.assertEqual(len(alive), 0)
- return gone, alive
-
- sproc1.terminate()
- sproc2.terminate()
- gone, alive = test(procs, callback)
- self.assertEqual(set(l), set([sproc1.pid, sproc2.pid, sproc3.pid]))
- for p in gone:
- self.assertTrue(hasattr(p, 'returncode'))
-
- def test_wait_procs_no_timeout(self):
- sproc1 = get_test_subprocess()
- sproc2 = get_test_subprocess()
- sproc3 = get_test_subprocess()
- procs = [psutil.Process(x.pid) for x in (sproc1, sproc2, sproc3)]
- for p in procs:
- p.terminate()
- gone, alive = psutil.wait_procs(procs)
-
- def test_boot_time(self):
- bt = psutil.boot_time()
- self.assertIsInstance(bt, float)
- self.assertGreater(bt, 0)
- self.assertLess(bt, time.time())
-
- @unittest.skipUnless(POSIX, 'posix only')
- def test_PAGESIZE(self):
- # pagesize is used internally to perform different calculations
- # and it's determined by using SC_PAGE_SIZE; make sure
- # getpagesize() returns the same value.
- import resource
- self.assertEqual(os.sysconf("SC_PAGE_SIZE"), resource.getpagesize())
-
- def test_virtual_memory(self):
- mem = psutil.virtual_memory()
- assert mem.total > 0, mem
- assert mem.available > 0, mem
- assert 0 <= mem.percent <= 100, mem
- assert mem.used > 0, mem
- assert mem.free >= 0, mem
- for name in mem._fields:
- value = getattr(mem, name)
- if name != 'percent':
- self.assertIsInstance(value, (int, long))
- if name != 'total':
- if not value >= 0:
- self.fail("%r < 0 (%s)" % (name, value))
- if value > mem.total:
- self.fail("%r > total (total=%s, %s=%s)"
- % (name, mem.total, name, value))
-
- def test_swap_memory(self):
- mem = psutil.swap_memory()
- assert mem.total >= 0, mem
- assert mem.used >= 0, mem
- if mem.total > 0:
- # likely a system with no swap partition
- assert mem.free > 0, mem
- else:
- assert mem.free == 0, mem
- assert 0 <= mem.percent <= 100, mem
- assert mem.sin >= 0, mem
- assert mem.sout >= 0, mem
-
- def test_pid_exists(self):
- sproc = get_test_subprocess(wait=True)
- self.assertTrue(psutil.pid_exists(sproc.pid))
- p = psutil.Process(sproc.pid)
- p.kill()
- p.wait()
- self.assertFalse(psutil.pid_exists(sproc.pid))
- self.assertFalse(psutil.pid_exists(-1))
- self.assertEqual(psutil.pid_exists(0), 0 in psutil.pids())
- # pid 0
- psutil.pid_exists(0) == 0 in psutil.pids()
-
- def test_pid_exists_2(self):
- reap_children()
- pids = psutil.pids()
- for pid in pids:
- try:
- assert psutil.pid_exists(pid)
- except AssertionError:
- # in case the process disappeared in meantime fail only
- # if it is no longer in psutil.pids()
- time.sleep(.1)
- if pid in psutil.pids():
- self.fail(pid)
- pids = range(max(pids) + 5000, max(pids) + 6000)
- for pid in pids:
- self.assertFalse(psutil.pid_exists(pid), msg=pid)
-
- def test_pids(self):
- plist = [x.pid for x in psutil.process_iter()]
- pidlist = psutil.pids()
- self.assertEqual(plist.sort(), pidlist.sort())
- # make sure every pid is unique
- self.assertEqual(len(pidlist), len(set(pidlist)))
-
- def test_test(self):
- # test for psutil.test() function
- stdout = sys.stdout
- sys.stdout = DEVNULL
- try:
- psutil.test()
- finally:
- sys.stdout = stdout
-
- def test_cpu_count(self):
- logical = psutil.cpu_count()
- self.assertEqual(logical, len(psutil.cpu_times(percpu=True)))
- self.assertGreaterEqual(logical, 1)
- #
- if LINUX:
- with open("/proc/cpuinfo") as fd:
- cpuinfo_data = fd.read()
- if "physical id" not in cpuinfo_data:
- raise unittest.SkipTest("cpuinfo doesn't include physical id")
- physical = psutil.cpu_count(logical=False)
- self.assertGreaterEqual(physical, 1)
- self.assertGreaterEqual(logical, physical)
-
- def test_sys_cpu_times(self):
- total = 0
- times = psutil.cpu_times()
- sum(times)
- for cp_time in times:
- self.assertIsInstance(cp_time, float)
- self.assertGreaterEqual(cp_time, 0.0)
- total += cp_time
- self.assertEqual(total, sum(times))
- str(times)
- if not WINDOWS:
- # CPU times are always supposed to increase over time or
- # remain the same but never go backwards, see:
- # https://github.com/giampaolo/psutil/issues/392
- last = psutil.cpu_times()
- for x in range(100):
- new = psutil.cpu_times()
- for field in new._fields:
- new_t = getattr(new, field)
- last_t = getattr(last, field)
- self.assertGreaterEqual(new_t, last_t,
- msg="%s %s" % (new_t, last_t))
- last = new
-
- def test_sys_cpu_times2(self):
- t1 = sum(psutil.cpu_times())
- time.sleep(0.1)
- t2 = sum(psutil.cpu_times())
- difference = t2 - t1
- if not difference >= 0.05:
- self.fail("difference %s" % difference)
-
- def test_sys_per_cpu_times(self):
- for times in psutil.cpu_times(percpu=True):
- total = 0
- sum(times)
- for cp_time in times:
- self.assertIsInstance(cp_time, float)
- self.assertGreaterEqual(cp_time, 0.0)
- total += cp_time
- self.assertEqual(total, sum(times))
- str(times)
- self.assertEqual(len(psutil.cpu_times(percpu=True)[0]),
- len(psutil.cpu_times(percpu=False)))
-
- # Note: in theory CPU times are always supposed to increase over
- # time or remain the same but never go backwards. In practice
- # sometimes this is not the case.
- # This issue seemd to be afflict Windows:
- # https://github.com/giampaolo/psutil/issues/392
- # ...but it turns out also Linux (rarely) behaves the same.
- # last = psutil.cpu_times(percpu=True)
- # for x in range(100):
- # new = psutil.cpu_times(percpu=True)
- # for index in range(len(new)):
- # newcpu = new[index]
- # lastcpu = last[index]
- # for field in newcpu._fields:
- # new_t = getattr(newcpu, field)
- # last_t = getattr(lastcpu, field)
- # self.assertGreaterEqual(
- # new_t, last_t, msg="%s %s" % (lastcpu, newcpu))
- # last = new
-
- def test_sys_per_cpu_times_2(self):
- tot1 = psutil.cpu_times(percpu=True)
- stop_at = time.time() + 0.1
- while True:
- if time.time() >= stop_at:
- break
- tot2 = psutil.cpu_times(percpu=True)
- for t1, t2 in zip(tot1, tot2):
- t1, t2 = sum(t1), sum(t2)
- difference = t2 - t1
- if difference >= 0.05:
- return
- self.fail()
-
- def _test_cpu_percent(self, percent, last_ret, new_ret):
- try:
- self.assertIsInstance(percent, float)
- self.assertGreaterEqual(percent, 0.0)
- self.assertIsNot(percent, -0.0)
- self.assertLessEqual(percent, 100.0 * psutil.cpu_count())
- except AssertionError as err:
- raise AssertionError("\n%s\nlast=%s\nnew=%s" % (
- err, pprint.pformat(last_ret), pprint.pformat(new_ret)))
-
- def test_sys_cpu_percent(self):
- last = psutil.cpu_percent(interval=0.001)
- for x in range(100):
- new = psutil.cpu_percent(interval=None)
- self._test_cpu_percent(new, last, new)
- last = new
-
- def test_sys_per_cpu_percent(self):
- last = psutil.cpu_percent(interval=0.001, percpu=True)
- self.assertEqual(len(last), psutil.cpu_count())
- for x in range(100):
- new = psutil.cpu_percent(interval=None, percpu=True)
- for percent in new:
- self._test_cpu_percent(percent, last, new)
- last = new
-
- def test_sys_cpu_times_percent(self):
- last = psutil.cpu_times_percent(interval=0.001)
- for x in range(100):
- new = psutil.cpu_times_percent(interval=None)
- for percent in new:
- self._test_cpu_percent(percent, last, new)
- self._test_cpu_percent(sum(new), last, new)
- last = new
-
- def test_sys_per_cpu_times_percent(self):
- last = psutil.cpu_times_percent(interval=0.001, percpu=True)
- self.assertEqual(len(last), psutil.cpu_count())
- for x in range(100):
- new = psutil.cpu_times_percent(interval=None, percpu=True)
- for cpu in new:
- for percent in cpu:
- self._test_cpu_percent(percent, last, new)
- self._test_cpu_percent(sum(cpu), last, new)
- last = new
-
- def test_sys_per_cpu_times_percent_negative(self):
- # see: https://github.com/giampaolo/psutil/issues/645
- psutil.cpu_times_percent(percpu=True)
- zero_times = [x._make([0 for x in range(len(x._fields))])
- for x in psutil.cpu_times(percpu=True)]
- with mock.patch('psutil.cpu_times', return_value=zero_times):
- for cpu in psutil.cpu_times_percent(percpu=True):
- for percent in cpu:
- self._test_cpu_percent(percent, None, None)
-
- @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
- "os.statvfs() function not available on this platform")
- def test_disk_usage(self):
- usage = psutil.disk_usage(os.getcwd())
- assert usage.total > 0, usage
- assert usage.used > 0, usage
- assert usage.free > 0, usage
- assert usage.total > usage.used, usage
- assert usage.total > usage.free, usage
- assert 0 <= usage.percent <= 100, usage.percent
- if hasattr(shutil, 'disk_usage'):
- # py >= 3.3, see: http://bugs.python.org/issue12442
- shutil_usage = shutil.disk_usage(os.getcwd())
- tolerance = 5 * 1024 * 1024 # 5MB
- self.assertEqual(usage.total, shutil_usage.total)
- self.assertAlmostEqual(usage.free, shutil_usage.free,
- delta=tolerance)
- self.assertAlmostEqual(usage.used, shutil_usage.used,
- delta=tolerance)
-
- # if path does not exist OSError ENOENT is expected across
- # all platforms
- fname = tempfile.mktemp()
- try:
- psutil.disk_usage(fname)
- except OSError as err:
- if err.args[0] != errno.ENOENT:
- raise
- else:
- self.fail("OSError not raised")
-
- @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
- "os.statvfs() function not available on this platform")
- def test_disk_usage_unicode(self):
- # see: https://github.com/giampaolo/psutil/issues/416
- # XXX this test is not really reliable as it always fails on
- # Python 3.X (2.X is fine)
- try:
- safe_rmdir(TESTFN_UNICODE)
- os.mkdir(TESTFN_UNICODE)
- psutil.disk_usage(TESTFN_UNICODE)
- safe_rmdir(TESTFN_UNICODE)
- except UnicodeEncodeError:
- pass
-
- @unittest.skipIf(POSIX and not hasattr(os, 'statvfs'),
- "os.statvfs() function not available on this platform")
- @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
- def test_disk_partitions(self):
- # all = False
- ls = psutil.disk_partitions(all=False)
- # on travis we get:
- # self.assertEqual(p.cpu_affinity(), [n])
- # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, 7,... != [0]
- self.assertTrue(ls, msg=ls)
- for disk in ls:
- if WINDOWS and 'cdrom' in disk.opts:
- continue
- if not POSIX:
- assert os.path.exists(disk.device), disk
- else:
- # we cannot make any assumption about this, see:
- # http://goo.gl/p9c43
- disk.device
- if SUNOS:
- # on solaris apparently mount points can also be files
- assert os.path.exists(disk.mountpoint), disk
- else:
- assert os.path.isdir(disk.mountpoint), disk
- assert disk.fstype, disk
- self.assertIsInstance(disk.opts, str)
-
- # all = True
- ls = psutil.disk_partitions(all=True)
- self.assertTrue(ls, msg=ls)
- for disk in psutil.disk_partitions(all=True):
- if not WINDOWS:
- try:
- os.stat(disk.mountpoint)
- except OSError as err:
- # http://mail.python.org/pipermail/python-dev/
- # 2012-June/120787.html
- if err.errno not in (errno.EPERM, errno.EACCES):
- raise
- else:
- if SUNOS:
- # on solaris apparently mount points can also be files
- assert os.path.exists(disk.mountpoint), disk
- else:
- assert os.path.isdir(disk.mountpoint), disk
- self.assertIsInstance(disk.fstype, str)
- self.assertIsInstance(disk.opts, str)
-
- def find_mount_point(path):
- path = os.path.abspath(path)
- while not os.path.ismount(path):
- path = os.path.dirname(path)
- return path
-
- mount = find_mount_point(__file__)
- mounts = [x.mountpoint for x in psutil.disk_partitions(all=True)]
- self.assertIn(mount, mounts)
- psutil.disk_usage(mount)
-
- @skip_on_access_denied()
- def test_net_connections(self):
- def check(cons, families, types_):
- for conn in cons:
- self.assertIn(conn.family, families, msg=conn)
- if conn.family != getattr(socket, 'AF_UNIX', object()):
- self.assertIn(conn.type, types_, msg=conn)
-
- from psutil._common import conn_tmap
- for kind, groups in conn_tmap.items():
- if SUNOS and kind == 'unix':
- continue
- families, types_ = groups
- cons = psutil.net_connections(kind)
- self.assertEqual(len(cons), len(set(cons)))
- check(cons, families, types_)
-
- def test_net_io_counters(self):
- def check_ntuple(nt):
- self.assertEqual(nt[0], nt.bytes_sent)
- self.assertEqual(nt[1], nt.bytes_recv)
- self.assertEqual(nt[2], nt.packets_sent)
- self.assertEqual(nt[3], nt.packets_recv)
- self.assertEqual(nt[4], nt.errin)
- self.assertEqual(nt[5], nt.errout)
- self.assertEqual(nt[6], nt.dropin)
- self.assertEqual(nt[7], nt.dropout)
- assert nt.bytes_sent >= 0, nt
- assert nt.bytes_recv >= 0, nt
- assert nt.packets_sent >= 0, nt
- assert nt.packets_recv >= 0, nt
- assert nt.errin >= 0, nt
- assert nt.errout >= 0, nt
- assert nt.dropin >= 0, nt
- assert nt.dropout >= 0, nt
-
- ret = psutil.net_io_counters(pernic=False)
- check_ntuple(ret)
- ret = psutil.net_io_counters(pernic=True)
- self.assertNotEqual(ret, [])
- for key in ret:
- self.assertTrue(key)
- check_ntuple(ret[key])
-
- def test_net_if_addrs(self):
- nics = psutil.net_if_addrs()
- assert nics, nics
-
- # Not reliable on all platforms (net_if_addrs() reports more
- # interfaces).
- # self.assertEqual(sorted(nics.keys()),
- # sorted(psutil.net_io_counters(pernic=True).keys()))
-
- families = set([socket.AF_INET, AF_INET6, psutil.AF_LINK])
- for nic, addrs in nics.items():
- self.assertEqual(len(set(addrs)), len(addrs))
- for addr in addrs:
- self.assertIsInstance(addr.family, int)
- self.assertIsInstance(addr.address, str)
- self.assertIsInstance(addr.netmask, (str, type(None)))
- self.assertIsInstance(addr.broadcast, (str, type(None)))
- self.assertIn(addr.family, families)
- if sys.version_info >= (3, 4):
- self.assertIsInstance(addr.family, enum.IntEnum)
- if addr.family == socket.AF_INET:
- s = socket.socket(addr.family)
- with contextlib.closing(s):
- s.bind((addr.address, 0))
- elif addr.family == socket.AF_INET6:
- info = socket.getaddrinfo(
- addr.address, 0, socket.AF_INET6, socket.SOCK_STREAM,
- 0, socket.AI_PASSIVE)[0]
- af, socktype, proto, canonname, sa = info
- s = socket.socket(af, socktype, proto)
- with contextlib.closing(s):
- s.bind(sa)
- for ip in (addr.address, addr.netmask, addr.broadcast):
- if ip is not None:
- # TODO: skip AF_INET6 for now because I get:
- # AddressValueError: Only hex digits permitted in
- # u'c6f3%lxcbr0' in u'fe80::c8e0:fff:fe54:c6f3%lxcbr0'
- if addr.family != AF_INET6:
- check_ip_address(ip, addr.family)
-
- if BSD or OSX or SUNOS:
- if hasattr(socket, "AF_LINK"):
- self.assertEqual(psutil.AF_LINK, socket.AF_LINK)
- elif LINUX:
- self.assertEqual(psutil.AF_LINK, socket.AF_PACKET)
- elif WINDOWS:
- self.assertEqual(psutil.AF_LINK, -1)
-
- @unittest.skipIf(TRAVIS, "EPERM on travis")
- def test_net_if_stats(self):
- nics = psutil.net_if_stats()
- assert nics, nics
- all_duplexes = (psutil.NIC_DUPLEX_FULL,
- psutil.NIC_DUPLEX_HALF,
- psutil.NIC_DUPLEX_UNKNOWN)
- for nic, stats in nics.items():
- isup, duplex, speed, mtu = stats
- self.assertIsInstance(isup, bool)
- self.assertIn(duplex, all_duplexes)
- self.assertIn(duplex, all_duplexes)
- self.assertGreaterEqual(speed, 0)
- self.assertGreaterEqual(mtu, 0)
-
- @unittest.skipIf(LINUX and not os.path.exists('/proc/diskstats'),
- '/proc/diskstats not available on this linux version')
- @unittest.skipIf(APPVEYOR,
- "can't find any physical disk on Appveyor")
- def test_disk_io_counters(self):
- def check_ntuple(nt):
- self.assertEqual(nt[0], nt.read_count)
- self.assertEqual(nt[1], nt.write_count)
- self.assertEqual(nt[2], nt.read_bytes)
- self.assertEqual(nt[3], nt.write_bytes)
- self.assertEqual(nt[4], nt.read_time)
- self.assertEqual(nt[5], nt.write_time)
- assert nt.read_count >= 0, nt
- assert nt.write_count >= 0, nt
- assert nt.read_bytes >= 0, nt
- assert nt.write_bytes >= 0, nt
- assert nt.read_time >= 0, nt
- assert nt.write_time >= 0, nt
-
- ret = psutil.disk_io_counters(perdisk=False)
- check_ntuple(ret)
- ret = psutil.disk_io_counters(perdisk=True)
- # make sure there are no duplicates
- self.assertEqual(len(ret), len(set(ret)))
- for key in ret:
- assert key, key
- check_ntuple(ret[key])
- if LINUX and key[-1].isdigit():
- # if 'sda1' is listed 'sda' shouldn't, see:
- # https://github.com/giampaolo/psutil/issues/338
- while key[-1].isdigit():
- key = key[:-1]
- self.assertNotIn(key, ret.keys())
-
- def test_users(self):
- users = psutil.users()
- if not APPVEYOR:
- self.assertNotEqual(users, [])
- for user in users:
- assert user.name, user
- user.terminal
- user.host
- assert user.started > 0.0, user
- datetime.datetime.fromtimestamp(user.started)
-
-
-# ===================================================================
-# --- psutil.Process class tests
-# ===================================================================
-
-class TestProcess(unittest.TestCase):
- """Tests for psutil.Process class."""
-
- def setUp(self):
- safe_remove(TESTFN)
-
- def tearDown(self):
- reap_children()
-
- def test_pid(self):
- self.assertEqual(psutil.Process().pid, os.getpid())
- sproc = get_test_subprocess()
- self.assertEqual(psutil.Process(sproc.pid).pid, sproc.pid)
-
- def test_kill(self):
- sproc = get_test_subprocess(wait=True)
- test_pid = sproc.pid
- p = psutil.Process(test_pid)
- p.kill()
- sig = p.wait()
- self.assertFalse(psutil.pid_exists(test_pid))
- if POSIX:
- self.assertEqual(sig, signal.SIGKILL)
-
- def test_terminate(self):
- sproc = get_test_subprocess(wait=True)
- test_pid = sproc.pid
- p = psutil.Process(test_pid)
- p.terminate()
- sig = p.wait()
- self.assertFalse(psutil.pid_exists(test_pid))
- if POSIX:
- self.assertEqual(sig, signal.SIGTERM)
-
- def test_send_signal(self):
- sig = signal.SIGKILL if POSIX else signal.SIGTERM
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.send_signal(sig)
- exit_sig = p.wait()
- self.assertFalse(psutil.pid_exists(p.pid))
- if POSIX:
- self.assertEqual(exit_sig, sig)
- #
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.send_signal(sig)
- with mock.patch('psutil.os.kill',
- side_effect=OSError(errno.ESRCH, "")) as fun:
- with self.assertRaises(psutil.NoSuchProcess):
- p.send_signal(sig)
- assert fun.called
- #
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.send_signal(sig)
- with mock.patch('psutil.os.kill',
- side_effect=OSError(errno.EPERM, "")) as fun:
- with self.assertRaises(psutil.AccessDenied):
- p.send_signal(sig)
- assert fun.called
-
- def test_wait(self):
- # check exit code signal
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.kill()
- code = p.wait()
- if POSIX:
- self.assertEqual(code, signal.SIGKILL)
- else:
- self.assertEqual(code, 0)
- self.assertFalse(p.is_running())
-
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.terminate()
- code = p.wait()
- if POSIX:
- self.assertEqual(code, signal.SIGTERM)
- else:
- self.assertEqual(code, 0)
- self.assertFalse(p.is_running())
-
- # check sys.exit() code
- code = "import time, sys; time.sleep(0.01); sys.exit(5);"
- sproc = get_test_subprocess([PYTHON, "-c", code])
- p = psutil.Process(sproc.pid)
- self.assertEqual(p.wait(), 5)
- self.assertFalse(p.is_running())
-
- # Test wait() issued twice.
- # It is not supposed to raise NSP when the process is gone.
- # On UNIX this should return None, on Windows it should keep
- # returning the exit code.
- sproc = get_test_subprocess([PYTHON, "-c", code])
- p = psutil.Process(sproc.pid)
- self.assertEqual(p.wait(), 5)
- self.assertIn(p.wait(), (5, None))
-
- # test timeout
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.name()
- self.assertRaises(psutil.TimeoutExpired, p.wait, 0.01)
-
- # timeout < 0 not allowed
- self.assertRaises(ValueError, p.wait, -1)
-
- # XXX why is this skipped on Windows?
- @unittest.skipUnless(POSIX, 'skipped on Windows')
- def test_wait_non_children(self):
- # test wait() against processes which are not our children
- code = "import sys;"
- code += "from subprocess import Popen, PIPE;"
- code += "cmd = ['%s', '-c', 'import time; time.sleep(60)'];" % PYTHON
- code += "sp = Popen(cmd, stdout=PIPE);"
- code += "sys.stdout.write(str(sp.pid));"
- sproc = get_test_subprocess([PYTHON, "-c", code],
- stdout=subprocess.PIPE)
- grandson_pid = int(sproc.stdout.read())
- grandson_proc = psutil.Process(grandson_pid)
- try:
- self.assertRaises(psutil.TimeoutExpired, grandson_proc.wait, 0.01)
- grandson_proc.kill()
- ret = grandson_proc.wait()
- self.assertEqual(ret, None)
- finally:
- if grandson_proc.is_running():
- grandson_proc.kill()
- grandson_proc.wait()
-
- def test_wait_timeout_0(self):
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- self.assertRaises(psutil.TimeoutExpired, p.wait, 0)
- p.kill()
- stop_at = time.time() + 2
- while True:
- try:
- code = p.wait(0)
- except psutil.TimeoutExpired:
- if time.time() >= stop_at:
- raise
- else:
- break
- if POSIX:
- self.assertEqual(code, signal.SIGKILL)
- else:
- self.assertEqual(code, 0)
- self.assertFalse(p.is_running())
-
- def test_cpu_percent(self):
- p = psutil.Process()
- p.cpu_percent(interval=0.001)
- p.cpu_percent(interval=0.001)
- for x in range(100):
- percent = p.cpu_percent(interval=None)
- self.assertIsInstance(percent, float)
- self.assertGreaterEqual(percent, 0.0)
- if not POSIX:
- self.assertLessEqual(percent, 100.0)
- else:
- self.assertGreaterEqual(percent, 0.0)
-
- def test_cpu_times(self):
- times = psutil.Process().cpu_times()
- assert (times.user > 0.0) or (times.system > 0.0), times
- # make sure returned values can be pretty printed with strftime
- time.strftime("%H:%M:%S", time.localtime(times.user))
- time.strftime("%H:%M:%S", time.localtime(times.system))
-
- # Test Process.cpu_times() against os.times()
- # os.times() is broken on Python 2.6
- # http://bugs.python.org/issue1040026
- # XXX fails on OSX: not sure if it's for os.times(). We should
- # try this with Python 2.7 and re-enable the test.
-
- @unittest.skipUnless(sys.version_info > (2, 6, 1) and not OSX,
- 'os.times() is not reliable on this Python version')
- def test_cpu_times2(self):
- user_time, kernel_time = psutil.Process().cpu_times()
- utime, ktime = os.times()[:2]
-
- # Use os.times()[:2] as base values to compare our results
- # using a tolerance of +/- 0.1 seconds.
- # It will fail if the difference between the values is > 0.1s.
- if (max([user_time, utime]) - min([user_time, utime])) > 0.1:
- self.fail("expected: %s, found: %s" % (utime, user_time))
-
- if (max([kernel_time, ktime]) - min([kernel_time, ktime])) > 0.1:
- self.fail("expected: %s, found: %s" % (ktime, kernel_time))
-
- def test_create_time(self):
- sproc = get_test_subprocess(wait=True)
- now = time.time()
- p = psutil.Process(sproc.pid)
- create_time = p.create_time()
-
- # Use time.time() as base value to compare our result using a
- # tolerance of +/- 1 second.
- # It will fail if the difference between the values is > 2s.
- difference = abs(create_time - now)
- if difference > 2:
- self.fail("expected: %s, found: %s, difference: %s"
- % (now, create_time, difference))
-
- # make sure returned value can be pretty printed with strftime
- time.strftime("%Y %m %d %H:%M:%S", time.localtime(p.create_time()))
-
- @unittest.skipIf(WINDOWS, 'Windows only')
- def test_terminal(self):
- terminal = psutil.Process().terminal()
- if sys.stdin.isatty():
- self.assertEqual(terminal, sh('tty'))
- else:
- assert terminal, repr(terminal)
-
- @unittest.skipUnless(LINUX or BSD or WINDOWS,
- 'not available on this platform')
- @skip_on_not_implemented(only_if=LINUX)
- def test_io_counters(self):
- p = psutil.Process()
- # test reads
- io1 = p.io_counters()
- with open(PYTHON, 'rb') as f:
- f.read()
- io2 = p.io_counters()
- if not BSD:
- assert io2.read_count > io1.read_count, (io1, io2)
- self.assertEqual(io2.write_count, io1.write_count)
- assert io2.read_bytes >= io1.read_bytes, (io1, io2)
- assert io2.write_bytes >= io1.write_bytes, (io1, io2)
- # test writes
- io1 = p.io_counters()
- with tempfile.TemporaryFile(prefix=TESTFILE_PREFIX) as f:
- if PY3:
- f.write(bytes("x" * 1000000, 'ascii'))
- else:
- f.write("x" * 1000000)
- io2 = p.io_counters()
- assert io2.write_count >= io1.write_count, (io1, io2)
- assert io2.write_bytes >= io1.write_bytes, (io1, io2)
- assert io2.read_count >= io1.read_count, (io1, io2)
- assert io2.read_bytes >= io1.read_bytes, (io1, io2)
-
- @unittest.skipUnless(LINUX or (WINDOWS and get_winver() >= WIN_VISTA),
- 'Linux and Windows Vista only')
- @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
- def test_ionice(self):
- if LINUX:
- from psutil import (IOPRIO_CLASS_NONE, IOPRIO_CLASS_RT,
- IOPRIO_CLASS_BE, IOPRIO_CLASS_IDLE)
- self.assertEqual(IOPRIO_CLASS_NONE, 0)
- self.assertEqual(IOPRIO_CLASS_RT, 1)
- self.assertEqual(IOPRIO_CLASS_BE, 2)
- self.assertEqual(IOPRIO_CLASS_IDLE, 3)
- p = psutil.Process()
- try:
- p.ionice(2)
- ioclass, value = p.ionice()
- if enum is not None:
- self.assertIsInstance(ioclass, enum.IntEnum)
- self.assertEqual(ioclass, 2)
- self.assertEqual(value, 4)
- #
- p.ionice(3)
- ioclass, value = p.ionice()
- self.assertEqual(ioclass, 3)
- self.assertEqual(value, 0)
- #
- p.ionice(2, 0)
- ioclass, value = p.ionice()
- self.assertEqual(ioclass, 2)
- self.assertEqual(value, 0)
- p.ionice(2, 7)
- ioclass, value = p.ionice()
- self.assertEqual(ioclass, 2)
- self.assertEqual(value, 7)
- #
- self.assertRaises(ValueError, p.ionice, 2, 10)
- self.assertRaises(ValueError, p.ionice, 2, -1)
- self.assertRaises(ValueError, p.ionice, 4)
- self.assertRaises(TypeError, p.ionice, 2, "foo")
- self.assertRaisesRegexp(
- ValueError, "can't specify value with IOPRIO_CLASS_NONE",
- p.ionice, psutil.IOPRIO_CLASS_NONE, 1)
- self.assertRaisesRegexp(
- ValueError, "can't specify value with IOPRIO_CLASS_IDLE",
- p.ionice, psutil.IOPRIO_CLASS_IDLE, 1)
- self.assertRaisesRegexp(
- ValueError, "'ioclass' argument must be specified",
- p.ionice, value=1)
- finally:
- p.ionice(IOPRIO_CLASS_NONE)
- else:
- p = psutil.Process()
- original = p.ionice()
- self.assertIsInstance(original, int)
- try:
- value = 0 # very low
- if original == value:
- value = 1 # low
- p.ionice(value)
- self.assertEqual(p.ionice(), value)
- finally:
- p.ionice(original)
- #
- self.assertRaises(ValueError, p.ionice, 3)
- self.assertRaises(TypeError, p.ionice, 2, 1)
-
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_get(self):
- import resource
- p = psutil.Process(os.getpid())
- names = [x for x in dir(psutil) if x.startswith('RLIMIT')]
- assert names, names
- for name in names:
- value = getattr(psutil, name)
- self.assertGreaterEqual(value, 0)
- if name in dir(resource):
- self.assertEqual(value, getattr(resource, name))
- self.assertEqual(p.rlimit(value), resource.getrlimit(value))
- else:
- ret = p.rlimit(value)
- self.assertEqual(len(ret), 2)
- self.assertGreaterEqual(ret[0], -1)
- self.assertGreaterEqual(ret[1], -1)
-
- @unittest.skipUnless(LINUX and RLIMIT_SUPPORT,
- "only available on Linux >= 2.6.36")
- def test_rlimit_set(self):
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.rlimit(psutil.RLIMIT_NOFILE, (5, 5))
- self.assertEqual(p.rlimit(psutil.RLIMIT_NOFILE), (5, 5))
- # If pid is 0 prlimit() applies to the calling process and
- # we don't want that.
- with self.assertRaises(ValueError):
- psutil._psplatform.Process(0).rlimit(0)
- with self.assertRaises(ValueError):
- p.rlimit(psutil.RLIMIT_NOFILE, (5, 5, 5))
-
- def test_num_threads(self):
- # on certain platforms such as Linux we might test for exact
- # thread number, since we always have with 1 thread per process,
- # but this does not apply across all platforms (OSX, Windows)
- p = psutil.Process()
- step1 = p.num_threads()
-
- thread = ThreadTask()
- thread.start()
- try:
- step2 = p.num_threads()
- self.assertEqual(step2, step1 + 1)
- thread.stop()
- finally:
- if thread._running:
- thread.stop()
-
- @unittest.skipUnless(WINDOWS, 'Windows only')
- def test_num_handles(self):
- # a better test is done later into test/_windows.py
- p = psutil.Process()
- self.assertGreater(p.num_handles(), 0)
-
- def test_threads(self):
- p = psutil.Process()
- step1 = p.threads()
-
- thread = ThreadTask()
- thread.start()
-
- try:
- step2 = p.threads()
- self.assertEqual(len(step2), len(step1) + 1)
- # on Linux, first thread id is supposed to be this process
- if LINUX:
- self.assertEqual(step2[0].id, os.getpid())
- athread = step2[0]
- # test named tuple
- self.assertEqual(athread.id, athread[0])
- self.assertEqual(athread.user_time, athread[1])
- self.assertEqual(athread.system_time, athread[2])
- # test num threads
- thread.stop()
- finally:
- if thread._running:
- thread.stop()
-
- def test_memory_info(self):
- p = psutil.Process()
-
- # step 1 - get a base value to compare our results
- rss1, vms1 = p.memory_info()
- percent1 = p.memory_percent()
- self.assertGreater(rss1, 0)
- self.assertGreater(vms1, 0)
-
- # step 2 - allocate some memory
- memarr = [None] * 1500000
-
- rss2, vms2 = p.memory_info()
- percent2 = p.memory_percent()
- # make sure that the memory usage bumped up
- self.assertGreater(rss2, rss1)
- self.assertGreaterEqual(vms2, vms1) # vms might be equal
- self.assertGreater(percent2, percent1)
- del memarr
-
- # def test_memory_info_ex(self):
- # # tested later in fetch all test suite
-
- def test_memory_maps(self):
- p = psutil.Process()
- maps = p.memory_maps()
- paths = [x for x in maps]
- self.assertEqual(len(paths), len(set(paths)))
- ext_maps = p.memory_maps(grouped=False)
-
- for nt in maps:
- if not nt.path.startswith('['):
- assert os.path.isabs(nt.path), nt.path
- if POSIX:
- assert os.path.exists(nt.path), nt.path
- else:
- # XXX - On Windows we have this strange behavior with
- # 64 bit dlls: they are visible via explorer but cannot
- # be accessed via os.stat() (wtf?).
- if '64' not in os.path.basename(nt.path):
- assert os.path.exists(nt.path), nt.path
- for nt in ext_maps:
- for fname in nt._fields:
- value = getattr(nt, fname)
- if fname == 'path':
- continue
- elif fname in ('addr', 'perms'):
- assert value, value
- else:
- self.assertIsInstance(value, (int, long))
- assert value >= 0, value
-
- def test_memory_percent(self):
- p = psutil.Process()
- self.assertGreater(p.memory_percent(), 0.0)
-
- def test_is_running(self):
- sproc = get_test_subprocess(wait=True)
- p = psutil.Process(sproc.pid)
- assert p.is_running()
- assert p.is_running()
- p.kill()
- p.wait()
- assert not p.is_running()
- assert not p.is_running()
-
- def test_exe(self):
- sproc = get_test_subprocess(wait=True)
- exe = psutil.Process(sproc.pid).exe()
- try:
- self.assertEqual(exe, PYTHON)
- except AssertionError:
- if WINDOWS and len(exe) == len(PYTHON):
- # on Windows we don't care about case sensitivity
- self.assertEqual(exe.lower(), PYTHON.lower())
- else:
- # certain platforms such as BSD are more accurate returning:
- # "/usr/local/bin/python2.7"
- # ...instead of:
- # "/usr/local/bin/python"
- # We do not want to consider this difference in accuracy
- # an error.
- ver = "%s.%s" % (sys.version_info[0], sys.version_info[1])
- self.assertEqual(exe.replace(ver, ''), PYTHON.replace(ver, ''))
-
- def test_cmdline(self):
- cmdline = [PYTHON, "-c", "import time; time.sleep(60)"]
- sproc = get_test_subprocess(cmdline, wait=True)
- self.assertEqual(' '.join(psutil.Process(sproc.pid).cmdline()),
- ' '.join(cmdline))
-
- def test_name(self):
- sproc = get_test_subprocess(PYTHON, wait=True)
- name = psutil.Process(sproc.pid).name().lower()
- pyexe = os.path.basename(os.path.realpath(sys.executable)).lower()
- assert pyexe.startswith(name), (pyexe, name)
-
- @unittest.skipUnless(POSIX, "posix only")
- # TODO: add support for other compilers
- @unittest.skipUnless(which("gcc"), "gcc not available")
- def test_prog_w_funky_name(self):
- # Test that name(), exe() and cmdline() correctly handle programs
- # with funky chars such as spaces and ")", see:
- # https://github.com/giampaolo/psutil/issues/628
- funky_name = "/tmp/foo bar )"
- _, c_file = tempfile.mkstemp(prefix='psutil-', suffix='.c', dir="/tmp")
- self.addCleanup(lambda: safe_remove(c_file))
- self.addCleanup(lambda: safe_remove(funky_name))
- with open(c_file, "w") as f:
- f.write("void main() { pause(); }")
- subprocess.check_call(["gcc", c_file, "-o", funky_name])
- sproc = get_test_subprocess(
- [funky_name, "arg1", "arg2", "", "arg3", ""])
- p = psutil.Process(sproc.pid)
- # ...in order to try to prevent occasional failures on travis
- wait_for_pid(p.pid)
- self.assertEqual(p.name(), "foo bar )")
- self.assertEqual(p.exe(), "/tmp/foo bar )")
- self.assertEqual(
- p.cmdline(), ["/tmp/foo bar )", "arg1", "arg2", "", "arg3", ""])
-
- @unittest.skipUnless(POSIX, 'posix only')
- def test_uids(self):
- p = psutil.Process()
- real, effective, saved = p.uids()
- # os.getuid() refers to "real" uid
- self.assertEqual(real, os.getuid())
- # os.geteuid() refers to "effective" uid
- self.assertEqual(effective, os.geteuid())
- # no such thing as os.getsuid() ("saved" uid), but starting
- # from python 2.7 we have os.getresuid()[2]
- if hasattr(os, "getresuid"):
- self.assertEqual(saved, os.getresuid()[2])
-
- @unittest.skipUnless(POSIX, 'posix only')
- def test_gids(self):
- p = psutil.Process()
- real, effective, saved = p.gids()
- # os.getuid() refers to "real" uid
- self.assertEqual(real, os.getgid())
- # os.geteuid() refers to "effective" uid
- self.assertEqual(effective, os.getegid())
- # no such thing as os.getsuid() ("saved" uid), but starting
- # from python 2.7 we have os.getresgid()[2]
- if hasattr(os, "getresuid"):
- self.assertEqual(saved, os.getresgid()[2])
-
- def test_nice(self):
- p = psutil.Process()
- self.assertRaises(TypeError, p.nice, "str")
- if WINDOWS:
- try:
- init = p.nice()
- if sys.version_info > (3, 4):
- self.assertIsInstance(init, enum.IntEnum)
- else:
- self.assertIsInstance(init, int)
- self.assertEqual(init, psutil.NORMAL_PRIORITY_CLASS)
- p.nice(psutil.HIGH_PRIORITY_CLASS)
- self.assertEqual(p.nice(), psutil.HIGH_PRIORITY_CLASS)
- p.nice(psutil.NORMAL_PRIORITY_CLASS)
- self.assertEqual(p.nice(), psutil.NORMAL_PRIORITY_CLASS)
- finally:
- p.nice(psutil.NORMAL_PRIORITY_CLASS)
- else:
- try:
- first_nice = p.nice()
- p.nice(1)
- self.assertEqual(p.nice(), 1)
- # going back to previous nice value raises
- # AccessDenied on OSX
- if not OSX:
- p.nice(0)
- self.assertEqual(p.nice(), 0)
- except psutil.AccessDenied:
- pass
- finally:
- try:
- p.nice(first_nice)
- except psutil.AccessDenied:
- pass
-
- def test_status(self):
- p = psutil.Process()
- self.assertEqual(p.status(), psutil.STATUS_RUNNING)
-
- def test_username(self):
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- if POSIX:
- import pwd
- self.assertEqual(p.username(), pwd.getpwuid(os.getuid()).pw_name)
- with mock.patch("psutil.pwd.getpwuid",
- side_effect=KeyError) as fun:
- p.username() == str(p.uids().real)
- assert fun.called
-
- elif WINDOWS and 'USERNAME' in os.environ:
- expected_username = os.environ['USERNAME']
- expected_domain = os.environ['USERDOMAIN']
- domain, username = p.username().split('\\')
- self.assertEqual(domain, expected_domain)
- self.assertEqual(username, expected_username)
- else:
- p.username()
-
- def test_cwd(self):
- sproc = get_test_subprocess(wait=True)
- p = psutil.Process(sproc.pid)
- self.assertEqual(p.cwd(), os.getcwd())
-
- def test_cwd_2(self):
- cmd = [PYTHON, "-c", "import os, time; os.chdir('..'); time.sleep(60)"]
- sproc = get_test_subprocess(cmd, wait=True)
- p = psutil.Process(sproc.pid)
- call_until(p.cwd, "ret == os.path.dirname(os.getcwd())")
-
- @unittest.skipUnless(WINDOWS or LINUX or BSD,
- 'not available on this platform')
- @unittest.skipIf(LINUX and TRAVIS, "unknown failure on travis")
- def test_cpu_affinity(self):
- p = psutil.Process()
- initial = p.cpu_affinity()
- if hasattr(os, "sched_getaffinity"):
- self.assertEqual(initial, list(os.sched_getaffinity(p.pid)))
- self.assertEqual(len(initial), len(set(initial)))
- all_cpus = list(range(len(psutil.cpu_percent(percpu=True))))
- # setting on travis doesn't seem to work (always return all
- # CPUs on get):
- # AssertionError: Lists differ: [0, 1, 2, 3, 4, 5, 6, ... != [0]
- for n in all_cpus:
- p.cpu_affinity([n])
- self.assertEqual(p.cpu_affinity(), [n])
- if hasattr(os, "sched_getaffinity"):
- self.assertEqual(p.cpu_affinity(),
- list(os.sched_getaffinity(p.pid)))
- #
- p.cpu_affinity(all_cpus)
- self.assertEqual(p.cpu_affinity(), all_cpus)
- if hasattr(os, "sched_getaffinity"):
- self.assertEqual(p.cpu_affinity(),
- list(os.sched_getaffinity(p.pid)))
- #
- self.assertRaises(TypeError, p.cpu_affinity, 1)
- p.cpu_affinity(initial)
- # it should work with all iterables, not only lists
- p.cpu_affinity(set(all_cpus))
- p.cpu_affinity(tuple(all_cpus))
- invalid_cpu = [len(psutil.cpu_times(percpu=True)) + 10]
- self.assertRaises(ValueError, p.cpu_affinity, invalid_cpu)
- self.assertRaises(ValueError, p.cpu_affinity, range(10000, 11000))
- self.assertRaises(TypeError, p.cpu_affinity, [0, "1"])
-
- # TODO
- @unittest.skipIf(BSD, "broken on BSD, see #595")
- @unittest.skipIf(APPVEYOR,
- "can't find any process file on Appveyor")
- def test_open_files(self):
- # current process
- p = psutil.Process()
- files = p.open_files()
- self.assertFalse(TESTFN in files)
- with open(TESTFN, 'w'):
- # give the kernel some time to see the new file
- call_until(p.open_files, "len(ret) != %i" % len(files))
- filenames = [x.path for x in p.open_files()]
- self.assertIn(TESTFN, filenames)
- for file in filenames:
- assert os.path.isfile(file), file
-
- # another process
- cmdline = "import time; f = open(r'%s', 'r'); time.sleep(60);" % TESTFN
- sproc = get_test_subprocess([PYTHON, "-c", cmdline], wait=True)
- p = psutil.Process(sproc.pid)
-
- for x in range(100):
- filenames = [x.path for x in p.open_files()]
- if TESTFN in filenames:
- break
- time.sleep(.01)
- else:
- self.assertIn(TESTFN, filenames)
- for file in filenames:
- assert os.path.isfile(file), file
-
- # TODO
- @unittest.skipIf(BSD, "broken on BSD, see #595")
- @unittest.skipIf(APPVEYOR,
- "can't find any process file on Appveyor")
- def test_open_files2(self):
- # test fd and path fields
- with open(TESTFN, 'w') as fileobj:
- p = psutil.Process()
- for path, fd in p.open_files():
- if path == fileobj.name or fd == fileobj.fileno():
- break
- else:
- self.fail("no file found; files=%s" % repr(p.open_files()))
- self.assertEqual(path, fileobj.name)
- if WINDOWS:
- self.assertEqual(fd, -1)
- else:
- self.assertEqual(fd, fileobj.fileno())
- # test positions
- ntuple = p.open_files()[0]
- self.assertEqual(ntuple[0], ntuple.path)
- self.assertEqual(ntuple[1], ntuple.fd)
- # test file is gone
- self.assertTrue(fileobj.name not in p.open_files())
-
- def compare_proc_sys_cons(self, pid, proc_cons):
- from psutil._common import pconn
- sys_cons = []
- for c in psutil.net_connections(kind='all'):
- if c.pid == pid:
- sys_cons.append(pconn(*c[:-1]))
- if BSD:
- # on BSD all fds are set to -1
- proc_cons = [pconn(*[-1] + list(x[1:])) for x in proc_cons]
- self.assertEqual(sorted(proc_cons), sorted(sys_cons))
-
- @skip_on_access_denied(only_if=OSX)
- def test_connections(self):
- def check_conn(proc, conn, family, type, laddr, raddr, status, kinds):
- all_kinds = ("all", "inet", "inet4", "inet6", "tcp", "tcp4",
- "tcp6", "udp", "udp4", "udp6")
- check_connection_ntuple(conn)
- self.assertEqual(conn.family, family)
- self.assertEqual(conn.type, type)
- self.assertEqual(conn.laddr, laddr)
- self.assertEqual(conn.raddr, raddr)
- self.assertEqual(conn.status, status)
- for kind in all_kinds:
- cons = proc.connections(kind=kind)
- if kind in kinds:
- self.assertNotEqual(cons, [])
- else:
- self.assertEqual(cons, [])
- # compare against system-wide connections
- # XXX Solaris can't retrieve system-wide UNIX
- # sockets.
- if not SUNOS:
- self.compare_proc_sys_cons(proc.pid, [conn])
-
- tcp_template = textwrap.dedent("""
- import socket, time
- s = socket.socket($family, socket.SOCK_STREAM)
- s.bind(('$addr', 0))
- s.listen(1)
- with open('$testfn', 'w') as f:
- f.write(str(s.getsockname()[:2]))
- time.sleep(60)
- """)
-
- udp_template = textwrap.dedent("""
- import socket, time
- s = socket.socket($family, socket.SOCK_DGRAM)
- s.bind(('$addr', 0))
- with open('$testfn', 'w') as f:
- f.write(str(s.getsockname()[:2]))
- time.sleep(60)
- """)
-
- from string import Template
- testfile = os.path.basename(TESTFN)
- tcp4_template = Template(tcp_template).substitute(
- family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
- udp4_template = Template(udp_template).substitute(
- family=int(AF_INET), addr="127.0.0.1", testfn=testfile)
- tcp6_template = Template(tcp_template).substitute(
- family=int(AF_INET6), addr="::1", testfn=testfile)
- udp6_template = Template(udp_template).substitute(
- family=int(AF_INET6), addr="::1", testfn=testfile)
-
- # launch various subprocess instantiating a socket of various
- # families and types to enrich psutil results
- tcp4_proc = pyrun(tcp4_template)
- tcp4_addr = eval(wait_for_file(testfile))
- udp4_proc = pyrun(udp4_template)
- udp4_addr = eval(wait_for_file(testfile))
- if supports_ipv6():
- tcp6_proc = pyrun(tcp6_template)
- tcp6_addr = eval(wait_for_file(testfile))
- udp6_proc = pyrun(udp6_template)
- udp6_addr = eval(wait_for_file(testfile))
- else:
- tcp6_proc = None
- udp6_proc = None
- tcp6_addr = None
- udp6_addr = None
-
- for p in psutil.Process().children():
- cons = p.connections()
- self.assertEqual(len(cons), 1)
- for conn in cons:
- # TCP v4
- if p.pid == tcp4_proc.pid:
- check_conn(p, conn, AF_INET, SOCK_STREAM, tcp4_addr, (),
- psutil.CONN_LISTEN,
- ("all", "inet", "inet4", "tcp", "tcp4"))
- # UDP v4
- elif p.pid == udp4_proc.pid:
- check_conn(p, conn, AF_INET, SOCK_DGRAM, udp4_addr, (),
- psutil.CONN_NONE,
- ("all", "inet", "inet4", "udp", "udp4"))
- # TCP v6
- elif p.pid == getattr(tcp6_proc, "pid", None):
- check_conn(p, conn, AF_INET6, SOCK_STREAM, tcp6_addr, (),
- psutil.CONN_LISTEN,
- ("all", "inet", "inet6", "tcp", "tcp6"))
- # UDP v6
- elif p.pid == getattr(udp6_proc, "pid", None):
- check_conn(p, conn, AF_INET6, SOCK_DGRAM, udp6_addr, (),
- psutil.CONN_NONE,
- ("all", "inet", "inet6", "udp", "udp6"))
-
- @unittest.skipUnless(hasattr(socket, 'AF_UNIX'),
- 'AF_UNIX is not supported')
- @skip_on_access_denied(only_if=OSX)
- def test_connections_unix(self):
- def check(type):
- safe_remove(TESTFN)
- sock = socket.socket(AF_UNIX, type)
- with contextlib.closing(sock):
- sock.bind(TESTFN)
- cons = psutil.Process().connections(kind='unix')
- conn = cons[0]
- check_connection_ntuple(conn)
- if conn.fd != -1: # != sunos and windows
- self.assertEqual(conn.fd, sock.fileno())
- self.assertEqual(conn.family, AF_UNIX)
- self.assertEqual(conn.type, type)
- self.assertEqual(conn.laddr, TESTFN)
- if not SUNOS:
- # XXX Solaris can't retrieve system-wide UNIX
- # sockets.
- self.compare_proc_sys_cons(os.getpid(), cons)
-
- check(SOCK_STREAM)
- check(SOCK_DGRAM)
-
- @unittest.skipUnless(hasattr(socket, "fromfd"),
- 'socket.fromfd() is not availble')
- @unittest.skipIf(WINDOWS or SUNOS,
- 'connection fd not available on this platform')
- def test_connection_fromfd(self):
- with contextlib.closing(socket.socket()) as sock:
- sock.bind(('localhost', 0))
- sock.listen(1)
- p = psutil.Process()
- for conn in p.connections():
- if conn.fd == sock.fileno():
- break
- else:
- self.fail("couldn't find socket fd")
- dupsock = socket.fromfd(conn.fd, conn.family, conn.type)
- with contextlib.closing(dupsock):
- self.assertEqual(dupsock.getsockname(), conn.laddr)
- self.assertNotEqual(sock.fileno(), dupsock.fileno())
-
- def test_connection_constants(self):
- ints = []
- strs = []
- for name in dir(psutil):
- if name.startswith('CONN_'):
- num = getattr(psutil, name)
- str_ = str(num)
- assert str_.isupper(), str_
- assert str_ not in strs, str_
- assert num not in ints, num
- ints.append(num)
- strs.append(str_)
- if SUNOS:
- psutil.CONN_IDLE
- psutil.CONN_BOUND
- if WINDOWS:
- psutil.CONN_DELETE_TCB
-
- @unittest.skipUnless(POSIX, 'posix only')
- def test_num_fds(self):
- p = psutil.Process()
- start = p.num_fds()
- file = open(TESTFN, 'w')
- self.addCleanup(file.close)
- self.assertEqual(p.num_fds(), start + 1)
- sock = socket.socket()
- self.addCleanup(sock.close)
- self.assertEqual(p.num_fds(), start + 2)
- file.close()
- sock.close()
- self.assertEqual(p.num_fds(), start)
-
- @skip_on_not_implemented(only_if=LINUX)
- def test_num_ctx_switches(self):
- p = psutil.Process()
- before = sum(p.num_ctx_switches())
- for x in range(500000):
- after = sum(p.num_ctx_switches())
- if after > before:
- return
- self.fail("num ctx switches still the same after 50.000 iterations")
-
- def test_parent_ppid(self):
- this_parent = os.getpid()
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- self.assertEqual(p.ppid(), this_parent)
- self.assertEqual(p.parent().pid, this_parent)
- # no other process is supposed to have us as parent
- for p in psutil.process_iter():
- if p.pid == sproc.pid:
- continue
- self.assertTrue(p.ppid() != this_parent)
-
- def test_children(self):
- p = psutil.Process()
- self.assertEqual(p.children(), [])
- self.assertEqual(p.children(recursive=True), [])
- sproc = get_test_subprocess()
- children1 = p.children()
- children2 = p.children(recursive=True)
- for children in (children1, children2):
- self.assertEqual(len(children), 1)
- self.assertEqual(children[0].pid, sproc.pid)
- self.assertEqual(children[0].ppid(), os.getpid())
-
- def test_children_recursive(self):
- # here we create a subprocess which creates another one as in:
- # A (parent) -> B (child) -> C (grandchild)
- s = "import subprocess, os, sys, time;"
- s += "PYTHON = os.path.realpath(sys.executable);"
- s += "cmd = [PYTHON, '-c', 'import time; time.sleep(60);'];"
- s += "subprocess.Popen(cmd);"
- s += "time.sleep(60);"
- get_test_subprocess(cmd=[PYTHON, "-c", s])
- p = psutil.Process()
- self.assertEqual(len(p.children(recursive=False)), 1)
- # give the grandchild some time to start
- stop_at = time.time() + GLOBAL_TIMEOUT
- while time.time() < stop_at:
- children = p.children(recursive=True)
- if len(children) > 1:
- break
- self.assertEqual(len(children), 2)
- self.assertEqual(children[0].ppid(), os.getpid())
- self.assertEqual(children[1].ppid(), children[0].pid)
-
- def test_children_duplicates(self):
- # find the process which has the highest number of children
- table = collections.defaultdict(int)
- for p in psutil.process_iter():
- try:
- table[p.ppid()] += 1
- except psutil.Error:
- pass
- # this is the one, now let's make sure there are no duplicates
- pid = sorted(table.items(), key=lambda x: x[1])[-1][0]
- p = psutil.Process(pid)
- try:
- c = p.children(recursive=True)
- except psutil.AccessDenied: # windows
- pass
- else:
- self.assertEqual(len(c), len(set(c)))
-
- def test_suspend_resume(self):
- sproc = get_test_subprocess(wait=True)
- p = psutil.Process(sproc.pid)
- p.suspend()
- for x in range(100):
- if p.status() == psutil.STATUS_STOPPED:
- break
- time.sleep(0.01)
- p.resume()
- self.assertNotEqual(p.status(), psutil.STATUS_STOPPED)
-
- def test_invalid_pid(self):
- self.assertRaises(TypeError, psutil.Process, "1")
- self.assertRaises(ValueError, psutil.Process, -1)
-
- def test_as_dict(self):
- p = psutil.Process()
- d = p.as_dict(attrs=['exe', 'name'])
- self.assertEqual(sorted(d.keys()), ['exe', 'name'])
-
- p = psutil.Process(min(psutil.pids()))
- d = p.as_dict(attrs=['connections'], ad_value='foo')
- if not isinstance(d['connections'], list):
- self.assertEqual(d['connections'], 'foo')
-
- def test_halfway_terminated_process(self):
- # Test that NoSuchProcess exception gets raised in case the
- # process dies after we create the Process object.
- # Example:
- # >>> proc = Process(1234)
- # >>> time.sleep(2) # time-consuming task, process dies in meantime
- # >>> proc.name()
- # Refers to Issue #15
- sproc = get_test_subprocess()
- p = psutil.Process(sproc.pid)
- p.terminate()
- p.wait()
- if WINDOWS:
- wait_for_pid(p.pid)
- self.assertFalse(p.is_running())
- self.assertFalse(p.pid in psutil.pids())
-
- excluded_names = ['pid', 'is_running', 'wait', 'create_time']
- if LINUX and not RLIMIT_SUPPORT:
- excluded_names.append('rlimit')
- for name in dir(p):
- if (name.startswith('_') or
- name in excluded_names):
- continue
- try:
- meth = getattr(p, name)
- # get/set methods
- if name == 'nice':
- if POSIX:
- ret = meth(1)
- else:
- ret = meth(psutil.NORMAL_PRIORITY_CLASS)
- elif name == 'ionice':
- ret = meth()
- ret = meth(2)
- elif name == 'rlimit':
- ret = meth(psutil.RLIMIT_NOFILE)
- ret = meth(psutil.RLIMIT_NOFILE, (5, 5))
- elif name == 'cpu_affinity':
- ret = meth()
- ret = meth([0])
- elif name == 'send_signal':
- ret = meth(signal.SIGTERM)
- else:
- ret = meth()
- except psutil.ZombieProcess:
- self.fail("ZombieProcess for %r was not supposed to happen" %
- name)
- except psutil.NoSuchProcess:
- pass
- except NotImplementedError:
- pass
- else:
- self.fail(
- "NoSuchProcess exception not raised for %r, retval=%s" % (
- name, ret))
-
- @unittest.skipUnless(POSIX, 'posix only')
- def test_zombie_process(self):
- def succeed_or_zombie_p_exc(fun, *args, **kwargs):
- try:
- fun(*args, **kwargs)
- except (psutil.ZombieProcess, psutil.AccessDenied):
- pass
-
- # Note: in this test we'll be creating two sub processes.
- # Both of them are supposed to be freed / killed by
- # reap_children() as they are attributable to 'us'
- # (os.getpid()) via children(recursive=True).
- src = textwrap.dedent("""\
- import os, sys, time, socket, contextlib
- child_pid = os.fork()
- if child_pid > 0:
- time.sleep(3000)
- else:
- # this is the zombie process
- s = socket.socket(socket.AF_UNIX)
- with contextlib.closing(s):
- s.connect('%s')
- if sys.version_info < (3, ):
- pid = str(os.getpid())
- else:
- pid = bytes(str(os.getpid()), 'ascii')
- s.sendall(pid)
- """ % TESTFN)
- with contextlib.closing(socket.socket(socket.AF_UNIX)) as sock:
- try:
- sock.settimeout(GLOBAL_TIMEOUT)
- sock.bind(TESTFN)
- sock.listen(1)
- pyrun(src)
- conn, _ = sock.accept()
- select.select([conn.fileno()], [], [], GLOBAL_TIMEOUT)
- zpid = int(conn.recv(1024))
- zproc = psutil.Process(zpid)
- call_until(lambda: zproc.status(),
- "ret == psutil.STATUS_ZOMBIE")
- # A zombie process should always be instantiable
- zproc = psutil.Process(zpid)
- # ...and at least its status always be querable
- self.assertEqual(zproc.status(), psutil.STATUS_ZOMBIE)
- # ...and it should be considered 'running'
- self.assertTrue(zproc.is_running())
- # ...and as_dict() shouldn't crash
- zproc.as_dict()
- if hasattr(zproc, "rlimit"):
- succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE)
- succeed_or_zombie_p_exc(zproc.rlimit, psutil.RLIMIT_NOFILE,
- (5, 5))
- # set methods
- succeed_or_zombie_p_exc(zproc.parent)
- if hasattr(zproc, 'cpu_affinity'):
- succeed_or_zombie_p_exc(zproc.cpu_affinity, [0])
- succeed_or_zombie_p_exc(zproc.nice, 0)
- if hasattr(zproc, 'ionice'):
- if LINUX:
- succeed_or_zombie_p_exc(zproc.ionice, 2, 0)
- else:
- succeed_or_zombie_p_exc(zproc.ionice, 0) # Windows
- if hasattr(zproc, 'rlimit'):
- succeed_or_zombie_p_exc(zproc.rlimit,
- psutil.RLIMIT_NOFILE, (5, 5))
- succeed_or_zombie_p_exc(zproc.suspend)
- succeed_or_zombie_p_exc(zproc.resume)
- succeed_or_zombie_p_exc(zproc.terminate)
- succeed_or_zombie_p_exc(zproc.kill)
-
- # ...its parent should 'see' it
- # edit: not true on BSD and OSX
- # descendants = [x.pid for x in psutil.Process().children(
- # recursive=True)]
- # self.assertIn(zpid, descendants)
- # XXX should we also assume ppid be usable? Note: this
- # would be an important use case as the only way to get
- # rid of a zombie is to kill its parent.
- # self.assertEqual(zpid.ppid(), os.getpid())
- # ...and all other APIs should be able to deal with it
- self.assertTrue(psutil.pid_exists(zpid))
- self.assertIn(zpid, psutil.pids())
- self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
- psutil._pmap = {}
- self.assertIn(zpid, [x.pid for x in psutil.process_iter()])
- finally:
- reap_children(search_all=True)
-
- def test_pid_0(self):
- # Process(0) is supposed to work on all platforms except Linux
- if 0 not in psutil.pids():
- self.assertRaises(psutil.NoSuchProcess, psutil.Process, 0)
- return
-
- p = psutil.Process(0)
- self.assertTrue(p.name())
-
- if POSIX:
- try:
- self.assertEqual(p.uids().real, 0)
- self.assertEqual(p.gids().real, 0)
- except psutil.AccessDenied:
- pass
-
- self.assertRaisesRegexp(
- ValueError, "preventing sending signal to process with PID 0",
- p.send_signal, signal.SIGTERM)
-
- self.assertIn(p.ppid(), (0, 1))
- # self.assertEqual(p.exe(), "")
- p.cmdline()
- try:
- p.num_threads()
- except psutil.AccessDenied:
- pass
-
- try:
- p.memory_info()
- except psutil.AccessDenied:
- pass
-
- try:
- if POSIX:
- self.assertEqual(p.username(), 'root')
- elif WINDOWS:
- self.assertEqual(p.username(), 'NT AUTHORITY\\SYSTEM')
- else:
- p.username()
- except psutil.AccessDenied:
- pass
-
- self.assertIn(0, psutil.pids())
- self.assertTrue(psutil.pid_exists(0))
-
- def test_Popen(self):
- # Popen class test
- # XXX this test causes a ResourceWarning on Python 3 because
- # psutil.__subproc instance doesn't get propertly freed.
- # Not sure what to do though.
- cmd = [PYTHON, "-c", "import time; time.sleep(60);"]
- proc = psutil.Popen(cmd, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- try:
- proc.name()
- proc.stdin
- self.assertTrue(hasattr(proc, 'name'))
- self.assertTrue(hasattr(proc, 'stdin'))
- self.assertTrue(dir(proc))
- self.assertRaises(AttributeError, getattr, proc, 'foo')
- finally:
- proc.kill()
- proc.wait()
- self.assertIsNotNone(proc.returncode)
-
-
-# ===================================================================
-# --- Featch all processes test
-# ===================================================================
-
-class TestFetchAllProcesses(unittest.TestCase):
- """Test which iterates over all running processes and performs
- some sanity checks against Process API's returned values.
- """
-
- def setUp(self):
- if POSIX:
- import pwd
- pall = pwd.getpwall()
- self._uids = set([x.pw_uid for x in pall])
- self._usernames = set([x.pw_name for x in pall])
-
- def test_fetch_all(self):
- valid_procs = 0
- excluded_names = set([
- 'send_signal', 'suspend', 'resume', 'terminate', 'kill', 'wait',
- 'as_dict', 'cpu_percent', 'parent', 'children', 'pid'])
- if LINUX and not RLIMIT_SUPPORT:
- excluded_names.add('rlimit')
- attrs = []
- for name in dir(psutil.Process):
- if name.startswith("_"):
- continue
- if name in excluded_names:
- continue
- attrs.append(name)
-
- default = object()
- failures = []
- for name in attrs:
- for p in psutil.process_iter():
- ret = default
- try:
- try:
- args = ()
- attr = getattr(p, name, None)
- if attr is not None and callable(attr):
- if name == 'rlimit':
- args = (psutil.RLIMIT_NOFILE,)
- ret = attr(*args)
- else:
- ret = attr
- valid_procs += 1
- except NotImplementedError:
- msg = "%r was skipped because not implemented" % (
- self.__class__.__name__ + '.test_' + name)
- warn(msg)
- except (psutil.NoSuchProcess, psutil.AccessDenied) as err:
- self.assertEqual(err.pid, p.pid)
- if err.name:
- # make sure exception's name attr is set
- # with the actual process name
- self.assertEqual(err.name, p.name())
- self.assertTrue(str(err))
- self.assertTrue(err.msg)
- else:
- if ret not in (0, 0.0, [], None, ''):
- assert ret, ret
- meth = getattr(self, name)
- meth(ret)
- except Exception as err:
- s = '\n' + '=' * 70 + '\n'
- s += "FAIL: test_%s (proc=%s" % (name, p)
- if ret != default:
- s += ", ret=%s)" % repr(ret)
- s += ')\n'
- s += '-' * 70
- s += "\n%s" % traceback.format_exc()
- s = "\n".join((" " * 4) + i for i in s.splitlines())
- failures.append(s)
- break
-
- if failures:
- self.fail(''.join(failures))
-
- # we should always have a non-empty list, not including PID 0 etc.
- # special cases.
- self.assertTrue(valid_procs > 0)
-
- def cmdline(self, ret):
- pass
-
- def exe(self, ret):
- if not ret:
- self.assertEqual(ret, '')
- else:
- assert os.path.isabs(ret), ret
- # Note: os.stat() may return False even if the file is there
- # hence we skip the test, see:
- # http://stackoverflow.com/questions/3112546/os-path-exists-lies
- if POSIX and os.path.isfile(ret):
- if hasattr(os, 'access') and hasattr(os, "X_OK"):
- # XXX may fail on OSX
- self.assertTrue(os.access(ret, os.X_OK))
-
- def ppid(self, ret):
- self.assertTrue(ret >= 0)
-
- def name(self, ret):
- self.assertIsInstance(ret, (str, unicode))
- self.assertTrue(ret)
-
- def create_time(self, ret):
- self.assertTrue(ret > 0)
- # this can't be taken for granted on all platforms
- # self.assertGreaterEqual(ret, psutil.boot_time())
- # make sure returned value can be pretty printed
- # with strftime
- time.strftime("%Y %m %d %H:%M:%S", time.localtime(ret))
-
- def uids(self, ret):
- for uid in ret:
- self.assertTrue(uid >= 0)
- self.assertIn(uid, self._uids)
-
- def gids(self, ret):
- # note: testing all gids as above seems not to be reliable for
- # gid == 30 (nodoby); not sure why.
- for gid in ret:
- self.assertTrue(gid >= 0)
- # self.assertIn(uid, self.gids
-
- def username(self, ret):
- self.assertTrue(ret)
- if POSIX:
- self.assertIn(ret, self._usernames)
-
- def status(self, ret):
- self.assertTrue(ret != "")
- self.assertTrue(ret != '?')
- self.assertIn(ret, VALID_PROC_STATUSES)
-
- def io_counters(self, ret):
- for field in ret:
- if field != -1:
- self.assertTrue(field >= 0)
-
- def ionice(self, ret):
- if LINUX:
- self.assertTrue(ret.ioclass >= 0)
- self.assertTrue(ret.value >= 0)
- else:
- self.assertTrue(ret >= 0)
- self.assertIn(ret, (0, 1, 2))
-
- def num_threads(self, ret):
- self.assertTrue(ret >= 1)
-
- def threads(self, ret):
- for t in ret:
- self.assertTrue(t.id >= 0)
- self.assertTrue(t.user_time >= 0)
- self.assertTrue(t.system_time >= 0)
-
- def cpu_times(self, ret):
- self.assertTrue(ret.user >= 0)
- self.assertTrue(ret.system >= 0)
-
- def memory_info(self, ret):
- self.assertTrue(ret.rss >= 0)
- self.assertTrue(ret.vms >= 0)
-
- def memory_info_ex(self, ret):
- for name in ret._fields:
- self.assertTrue(getattr(ret, name) >= 0)
- if POSIX and ret.vms != 0:
- # VMS is always supposed to be the highest
- for name in ret._fields:
- if name != 'vms':
- value = getattr(ret, name)
- assert ret.vms > value, ret
- elif WINDOWS:
- assert ret.peak_wset >= ret.wset, ret
- assert ret.peak_paged_pool >= ret.paged_pool, ret
- assert ret.peak_nonpaged_pool >= ret.nonpaged_pool, ret
- assert ret.peak_pagefile >= ret.pagefile, ret
-
- def open_files(self, ret):
- for f in ret:
- if WINDOWS:
- assert f.fd == -1, f
- else:
- self.assertIsInstance(f.fd, int)
- assert os.path.isabs(f.path), f
- assert os.path.isfile(f.path), f
-
- def num_fds(self, ret):
- self.assertTrue(ret >= 0)
-
- def connections(self, ret):
- self.assertEqual(len(ret), len(set(ret)))
- for conn in ret:
- check_connection_ntuple(conn)
-
- def cwd(self, ret):
- if ret is not None: # BSD may return None
- assert os.path.isabs(ret), ret
- try:
- st = os.stat(ret)
- except OSError as err:
- # directory has been removed in mean time
- if err.errno != errno.ENOENT:
- raise
- else:
- self.assertTrue(stat.S_ISDIR(st.st_mode))
-
- def memory_percent(self, ret):
- assert 0 <= ret <= 100, ret
-
- def is_running(self, ret):
- self.assertTrue(ret)
-
- def cpu_affinity(self, ret):
- assert ret != [], ret
-
- def terminal(self, ret):
- if ret is not None:
- assert os.path.isabs(ret), ret
- assert os.path.exists(ret), ret
-
- def memory_maps(self, ret):
- for nt in ret:
- for fname in nt._fields:
- value = getattr(nt, fname)
- if fname == 'path':
- if not value.startswith('['):
- assert os.path.isabs(nt.path), nt.path
- # commented as on Linux we might get
- # '/foo/bar (deleted)'
- # assert os.path.exists(nt.path), nt.path
- elif fname in ('addr', 'perms'):
- self.assertTrue(value)
- else:
- self.assertIsInstance(value, (int, long))
- assert value >= 0, value
-
- def num_handles(self, ret):
- if WINDOWS:
- self.assertGreaterEqual(ret, 0)
- else:
- self.assertGreaterEqual(ret, 0)
-
- def nice(self, ret):
- if POSIX:
- assert -20 <= ret <= 20, ret
- else:
- priorities = [getattr(psutil, x) for x in dir(psutil)
- if x.endswith('_PRIORITY_CLASS')]
- self.assertIn(ret, priorities)
-
- def num_ctx_switches(self, ret):
- self.assertTrue(ret.voluntary >= 0)
- self.assertTrue(ret.involuntary >= 0)
-
- def rlimit(self, ret):
- self.assertEqual(len(ret), 2)
- self.assertGreaterEqual(ret[0], -1)
- self.assertGreaterEqual(ret[1], -1)
-
-
-# ===================================================================
-# --- Limited user tests
-# ===================================================================
-
-@unittest.skipUnless(POSIX, "UNIX only")
-@unittest.skipUnless(hasattr(os, 'getuid') and os.getuid() == 0,
- "super user privileges are required")
-class LimitedUserTestCase(TestProcess):
- """Repeat the previous tests by using a limited user.
- Executed only on UNIX and only if the user who run the test script
- is root.
- """
- # the uid/gid the test suite runs under
- if hasattr(os, 'getuid'):
- PROCESS_UID = os.getuid()
- PROCESS_GID = os.getgid()
-
- def __init__(self, *args, **kwargs):
- TestProcess.__init__(self, *args, **kwargs)
- # re-define all existent test methods in order to
- # ignore AccessDenied exceptions
- for attr in [x for x in dir(self) if x.startswith('test')]:
- meth = getattr(self, attr)
-
- def test_(self):
- try:
- meth()
- except psutil.AccessDenied:
- pass
- setattr(self, attr, types.MethodType(test_, self))
-
- def setUp(self):
- safe_remove(TESTFN)
- TestProcess.setUp(self)
- os.setegid(1000)
- os.seteuid(1000)
-
- def tearDown(self):
- os.setegid(self.PROCESS_UID)
- os.seteuid(self.PROCESS_GID)
- TestProcess.tearDown(self)
-
- def test_nice(self):
- try:
- psutil.Process().nice(-1)
- except psutil.AccessDenied:
- pass
- else:
- self.fail("exception not raised")
-
- def test_zombie_process(self):
- # causes problems if test test suite is run as root
- pass
-
-
-# ===================================================================
-# --- Misc tests
-# ===================================================================
-
-class TestMisc(unittest.TestCase):
- """Misc / generic tests."""
-
- def test_process__repr__(self, func=repr):
- p = psutil.Process()
- r = func(p)
- self.assertIn("psutil.Process", r)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("name=", r)
- self.assertIn(p.name(), r)
- with mock.patch.object(psutil.Process, "name",
- side_effect=psutil.ZombieProcess(os.getpid())):
- p = psutil.Process()
- r = func(p)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("zombie", r)
- self.assertNotIn("name=", r)
- with mock.patch.object(psutil.Process, "name",
- side_effect=psutil.NoSuchProcess(os.getpid())):
- p = psutil.Process()
- r = func(p)
- self.assertIn("pid=%s" % p.pid, r)
- self.assertIn("terminated", r)
- self.assertNotIn("name=", r)
-
- def test_process__str__(self):
- self.test_process__repr__(func=str)
-
- def test_no_such_process__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.NoSuchProcess(321)),
- "psutil.NoSuchProcess process no longer exists (pid=321)")
- self.assertEqual(
- repr(psutil.NoSuchProcess(321, name='foo')),
- "psutil.NoSuchProcess process no longer exists (pid=321, "
- "name='foo')")
- self.assertEqual(
- repr(psutil.NoSuchProcess(321, msg='foo')),
- "psutil.NoSuchProcess foo")
-
- def test_zombie_process__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.ZombieProcess(321)),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321)")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, name='foo')),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321, name='foo')")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, name='foo', ppid=1)),
- "psutil.ZombieProcess process still exists but it's a zombie "
- "(pid=321, name='foo', ppid=1)")
- self.assertEqual(
- repr(psutil.ZombieProcess(321, msg='foo')),
- "psutil.ZombieProcess foo")
-
- def test_access_denied__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.AccessDenied(321)),
- "psutil.AccessDenied (pid=321)")
- self.assertEqual(
- repr(psutil.AccessDenied(321, name='foo')),
- "psutil.AccessDenied (pid=321, name='foo')")
- self.assertEqual(
- repr(psutil.AccessDenied(321, msg='foo')),
- "psutil.AccessDenied foo")
-
- def test_timeout_expired__repr__(self, func=repr):
- self.assertEqual(
- repr(psutil.TimeoutExpired(321)),
- "psutil.TimeoutExpired timeout after 321 seconds")
- self.assertEqual(
- repr(psutil.TimeoutExpired(321, pid=111)),
- "psutil.TimeoutExpired timeout after 321 seconds (pid=111)")
- self.assertEqual(
- repr(psutil.TimeoutExpired(321, pid=111, name='foo')),
- "psutil.TimeoutExpired timeout after 321 seconds "
- "(pid=111, name='foo')")
-
- def test_process__eq__(self):
- p1 = psutil.Process()
- p2 = psutil.Process()
- self.assertEqual(p1, p2)
- p2._ident = (0, 0)
- self.assertNotEqual(p1, p2)
- self.assertNotEqual(p1, 'foo')
-
- def test_process__hash__(self):
- s = set([psutil.Process(), psutil.Process()])
- self.assertEqual(len(s), 1)
-
- def test__all__(self):
- dir_psutil = dir(psutil)
- for name in dir_psutil:
- if name in ('callable', 'error', 'namedtuple',
- 'long', 'test', 'NUM_CPUS', 'BOOT_TIME',
- 'TOTAL_PHYMEM'):
- continue
- if not name.startswith('_'):
- try:
- __import__(name)
- except ImportError:
- if name not in psutil.__all__:
- fun = getattr(psutil, name)
- if fun is None:
- continue
- if (fun.__doc__ is not None and
- 'deprecated' not in fun.__doc__.lower()):
- self.fail('%r not in psutil.__all__' % name)
-
- # Import 'star' will break if __all__ is inconsistent, see:
- # https://github.com/giampaolo/psutil/issues/656
- # Can't do `from psutil import *` as it won't work on python 3
- # so we simply iterate over __all__.
- for name in psutil.__all__:
- self.assertIn(name, dir_psutil)
-
- def test_version(self):
- self.assertEqual('.'.join([str(x) for x in psutil.version_info]),
- psutil.__version__)
-
- def test_memoize(self):
- from psutil._common import memoize
-
- @memoize
- def foo(*args, **kwargs):
- "foo docstring"
- calls.append(None)
- return (args, kwargs)
-
- calls = []
- # no args
- for x in range(2):
- ret = foo()
- expected = ((), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 1)
- # with args
- for x in range(2):
- ret = foo(1)
- expected = ((1, ), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 2)
- # with args + kwargs
- for x in range(2):
- ret = foo(1, bar=2)
- expected = ((1, ), {'bar': 2})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 3)
- # clear cache
- foo.cache_clear()
- ret = foo()
- expected = ((), {})
- self.assertEqual(ret, expected)
- self.assertEqual(len(calls), 4)
- # docstring
- self.assertEqual(foo.__doc__, "foo docstring")
-
- def test_isfile_strict(self):
- from psutil._common import isfile_strict
- this_file = os.path.abspath(__file__)
- assert isfile_strict(this_file)
- assert not isfile_strict(os.path.dirname(this_file))
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.EPERM, "foo")):
- self.assertRaises(OSError, isfile_strict, this_file)
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.EACCES, "foo")):
- self.assertRaises(OSError, isfile_strict, this_file)
- with mock.patch('psutil._common.os.stat',
- side_effect=OSError(errno.EINVAL, "foo")):
- assert not isfile_strict(this_file)
- with mock.patch('psutil._common.stat.S_ISREG', return_value=False):
- assert not isfile_strict(this_file)
-
- def test_serialization(self):
- def check(ret):
- if json is not None:
- json.loads(json.dumps(ret))
- a = pickle.dumps(ret)
- b = pickle.loads(a)
- self.assertEqual(ret, b)
-
- check(psutil.Process().as_dict())
- check(psutil.virtual_memory())
- check(psutil.swap_memory())
- check(psutil.cpu_times())
- check(psutil.cpu_times_percent(interval=0))
- check(psutil.net_io_counters())
- if LINUX and not os.path.exists('/proc/diskstats'):
- pass
- else:
- if not APPVEYOR:
- check(psutil.disk_io_counters())
- check(psutil.disk_partitions())
- check(psutil.disk_usage(os.getcwd()))
- check(psutil.users())
-
- def test_setup_script(self):
- here = os.path.abspath(os.path.dirname(__file__))
- setup_py = os.path.realpath(os.path.join(here, '..', 'setup.py'))
- module = imp.load_source('setup', setup_py)
- self.assertRaises(SystemExit, module.setup)
- self.assertEqual(module.get_version(), psutil.__version__)
-
- def test_ad_on_process_creation(self):
- # We are supposed to be able to instantiate Process also in case
- # of zombie processes or access denied.
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=psutil.AccessDenied) as meth:
- psutil.Process()
- assert meth.called
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=psutil.ZombieProcess(1)) as meth:
- psutil.Process()
- assert meth.called
- with mock.patch.object(psutil.Process, 'create_time',
- side_effect=ValueError) as meth:
- with self.assertRaises(ValueError):
- psutil.Process()
- assert meth.called
-
-
-# ===================================================================
-# --- Example script tests
-# ===================================================================
-
-class TestExampleScripts(unittest.TestCase):
- """Tests for scripts in the examples directory."""
-
- def assert_stdout(self, exe, args=None):
- exe = os.path.join(EXAMPLES_DIR, exe)
- if args:
- exe = exe + ' ' + args
- try:
- out = sh(sys.executable + ' ' + exe).strip()
- except RuntimeError as err:
- if 'AccessDenied' in str(err):
- return str(err)
- else:
- raise
- assert out, out
- return out
-
- def assert_syntax(self, exe, args=None):
- exe = os.path.join(EXAMPLES_DIR, exe)
- with open(exe, 'r') as f:
- src = f.read()
- ast.parse(src)
-
- def test_check_presence(self):
- # make sure all example scripts have a test method defined
- meths = dir(self)
- for name in os.listdir(EXAMPLES_DIR):
- if name.endswith('.py'):
- if 'test_' + os.path.splitext(name)[0] not in meths:
- # self.assert_stdout(name)
- self.fail('no test defined for %r script'
- % os.path.join(EXAMPLES_DIR, name))
-
- def test_disk_usage(self):
- self.assert_stdout('disk_usage.py')
-
- def test_free(self):
- self.assert_stdout('free.py')
-
- def test_meminfo(self):
- self.assert_stdout('meminfo.py')
-
- def test_process_detail(self):
- self.assert_stdout('process_detail.py')
-
- @unittest.skipIf(APPVEYOR, "can't find users on Appveyor")
- def test_who(self):
- self.assert_stdout('who.py')
-
- def test_ps(self):
- self.assert_stdout('ps.py')
-
- def test_pstree(self):
- self.assert_stdout('pstree.py')
-
- def test_netstat(self):
- self.assert_stdout('netstat.py')
-
- @unittest.skipIf(TRAVIS, "permission denied on travis")
- def test_ifconfig(self):
- self.assert_stdout('ifconfig.py')
-
- def test_pmap(self):
- self.assert_stdout('pmap.py', args=str(os.getpid()))
-
- @unittest.skipIf(ast is None,
- 'ast module not available on this python version')
- def test_killall(self):
- self.assert_syntax('killall.py')
-
- @unittest.skipIf(ast is None,
- 'ast module not available on this python version')
- def test_nettop(self):
- self.assert_syntax('nettop.py')
-
- @unittest.skipIf(ast is None,
- 'ast module not available on this python version')
- def test_top(self):
- self.assert_syntax('top.py')
-
- @unittest.skipIf(ast is None,
- 'ast module not available on this python version')
- def test_iotop(self):
- self.assert_syntax('iotop.py')
-
- def test_pidof(self):
- output = self.assert_stdout('pidof.py %s' % psutil.Process().name())
- self.assertIn(str(os.getpid()), output)
-
-
-def main():
- tests = []
- test_suite = unittest.TestSuite()
- tests.append(TestSystemAPIs)
- tests.append(TestProcess)
- tests.append(TestFetchAllProcesses)
- tests.append(TestMisc)
- tests.append(TestExampleScripts)
- tests.append(LimitedUserTestCase)
-
- if POSIX:
- from _posix import PosixSpecificTestCase
- tests.append(PosixSpecificTestCase)
-
- # import the specific platform test suite
- stc = None
- if LINUX:
- from _linux import LinuxSpecificTestCase as stc
- elif WINDOWS:
- from _windows import WindowsSpecificTestCase as stc
- from _windows import TestDualProcessImplementation
- tests.append(TestDualProcessImplementation)
- elif OSX:
- from _osx import OSXSpecificTestCase as stc
- elif BSD:
- from _bsd import BSDSpecificTestCase as stc
- elif SUNOS:
- from _sunos import SunOSSpecificTestCase as stc
- if stc is not None:
- tests.append(stc)
-
- for test_class in tests:
- test_suite.addTest(unittest.makeSuite(test_class))
- result = unittest.TextTestRunner(verbosity=2).run(test_suite)
- return result.wasSuccessful()
-
-if __name__ == '__main__':
- if not main():
- sys.exit(1)