summaryrefslogtreecommitdiffstats
path: root/python/psutil/test/_linux.py
diff options
context:
space:
mode:
Diffstat (limited to 'python/psutil/test/_linux.py')
-rw-r--r--python/psutil/test/_linux.py473
1 files changed, 473 insertions, 0 deletions
diff --git a/python/psutil/test/_linux.py b/python/psutil/test/_linux.py
new file mode 100644
index 000000000..c1927ea8b
--- /dev/null
+++ b/python/psutil/test/_linux.py
@@ -0,0 +1,473 @@
+#!/usr/bin/env python
+
+# Copyright (c) 2009, Giampaolo Rodola'. All rights reserved.
+# Use of this source code is governed by a BSD-style license that can be
+# found in the LICENSE file.
+
+"""Linux specific tests. These are implicitly run by test_psutil.py."""
+
+from __future__ import division
+import contextlib
+import errno
+import fcntl
+import io
+import os
+import pprint
+import re
+import socket
+import struct
+import sys
+import tempfile
+import time
+import warnings
+
+try:
+ from unittest import mock # py3
+except ImportError:
+ import mock # requires "pip install mock"
+
+from test_psutil import POSIX, TOLERANCE, TRAVIS, LINUX
+from test_psutil import (skip_on_not_implemented, sh, get_test_subprocess,
+ retry_before_failing, get_kernel_version, unittest,
+ which, call_until)
+
+import psutil
+import psutil._pslinux
+from psutil._compat import PY3, u
+
+
+SIOCGIFADDR = 0x8915
+SIOCGIFCONF = 0x8912
+SIOCGIFHWADDR = 0x8927
+
+
+def get_ipv4_address(ifname):
+ ifname = ifname[:15]
+ if PY3:
+ ifname = bytes(ifname, 'ascii')
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ with contextlib.closing(s):
+ return socket.inet_ntoa(
+ fcntl.ioctl(s.fileno(),
+ SIOCGIFADDR,
+ struct.pack('256s', ifname))[20:24])
+
+
+def get_mac_address(ifname):
+ ifname = ifname[:15]
+ if PY3:
+ ifname = bytes(ifname, 'ascii')
+ s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ with contextlib.closing(s):
+ info = fcntl.ioctl(
+ s.fileno(), SIOCGIFHWADDR, struct.pack('256s', ifname))
+ if PY3:
+ def ord(x):
+ return x
+ else:
+ import __builtin__
+ ord = __builtin__.ord
+ return ''.join(['%02x:' % ord(char) for char in info[18:24]])[:-1]
+
+
+@unittest.skipUnless(LINUX, "not a Linux system")
+class LinuxSpecificTestCase(unittest.TestCase):
+
+ @unittest.skipIf(
+ POSIX and not hasattr(os, 'statvfs'),
+ reason="os.statvfs() function not available on this platform")
+ @skip_on_not_implemented()
+ def test_disks(self):
+ # test psutil.disk_usage() and psutil.disk_partitions()
+ # against "df -a"
+ def df(path):
+ out = sh('df -P -B 1 "%s"' % path).strip()
+ lines = out.split('\n')
+ lines.pop(0)
+ line = lines.pop(0)
+ dev, total, used, free = line.split()[:4]
+ if dev == 'none':
+ dev = ''
+ total, used, free = int(total), int(used), int(free)
+ return dev, total, used, free
+
+ for part in psutil.disk_partitions(all=False):
+ usage = psutil.disk_usage(part.mountpoint)
+ dev, total, used, free = df(part.mountpoint)
+ self.assertEqual(part.device, dev)
+ self.assertEqual(usage.total, total)
+ # 10 MB tollerance
+ if abs(usage.free - free) > 10 * 1024 * 1024:
+ self.fail("psutil=%s, df=%s" % (usage.free, free))
+ if abs(usage.used - used) > 10 * 1024 * 1024:
+ self.fail("psutil=%s, df=%s" % (usage.used, used))
+
+ def test_memory_maps(self):
+ sproc = get_test_subprocess()
+ time.sleep(1)
+ p = psutil.Process(sproc.pid)
+ maps = p.memory_maps(grouped=False)
+ pmap = sh('pmap -x %s' % p.pid).split('\n')
+ # get rid of header
+ del pmap[0]
+ del pmap[0]
+ while maps and pmap:
+ this = maps.pop(0)
+ other = pmap.pop(0)
+ addr, _, rss, dirty, mode, path = other.split(None, 5)
+ if not path.startswith('[') and not path.endswith(']'):
+ self.assertEqual(path, os.path.basename(this.path))
+ self.assertEqual(int(rss) * 1024, this.rss)
+ # test only rwx chars, ignore 's' and 'p'
+ self.assertEqual(mode[:3], this.perms[:3])
+
+ def test_vmem_total(self):
+ lines = sh('free').split('\n')[1:]
+ total = int(lines[0].split()[1]) * 1024
+ self.assertEqual(total, psutil.virtual_memory().total)
+
+ @retry_before_failing()
+ def test_vmem_used(self):
+ lines = sh('free').split('\n')[1:]
+ used = int(lines[0].split()[2]) * 1024
+ self.assertAlmostEqual(used, psutil.virtual_memory().used,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_free(self):
+ lines = sh('free').split('\n')[1:]
+ free = int(lines[0].split()[3]) * 1024
+ self.assertAlmostEqual(free, psutil.virtual_memory().free,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_buffers(self):
+ lines = sh('free').split('\n')[1:]
+ buffers = int(lines[0].split()[5]) * 1024
+ self.assertAlmostEqual(buffers, psutil.virtual_memory().buffers,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_vmem_cached(self):
+ lines = sh('free').split('\n')[1:]
+ cached = int(lines[0].split()[6]) * 1024
+ self.assertAlmostEqual(cached, psutil.virtual_memory().cached,
+ delta=TOLERANCE)
+
+ def test_swapmem_total(self):
+ lines = sh('free').split('\n')[1:]
+ total = int(lines[2].split()[1]) * 1024
+ self.assertEqual(total, psutil.swap_memory().total)
+
+ @retry_before_failing()
+ def test_swapmem_used(self):
+ lines = sh('free').split('\n')[1:]
+ used = int(lines[2].split()[2]) * 1024
+ self.assertAlmostEqual(used, psutil.swap_memory().used,
+ delta=TOLERANCE)
+
+ @retry_before_failing()
+ def test_swapmem_free(self):
+ lines = sh('free').split('\n')[1:]
+ free = int(lines[2].split()[3]) * 1024
+ self.assertAlmostEqual(free, psutil.swap_memory().free,
+ delta=TOLERANCE)
+
+ @unittest.skipIf(TRAVIS, "unknown failure on travis")
+ def test_cpu_times(self):
+ fields = psutil.cpu_times()._fields
+ kernel_ver = re.findall('\d+\.\d+\.\d+', os.uname()[2])[0]
+ kernel_ver_info = tuple(map(int, kernel_ver.split('.')))
+ if kernel_ver_info >= (2, 6, 11):
+ self.assertIn('steal', fields)
+ else:
+ self.assertNotIn('steal', fields)
+ if kernel_ver_info >= (2, 6, 24):
+ self.assertIn('guest', fields)
+ else:
+ self.assertNotIn('guest', fields)
+ if kernel_ver_info >= (3, 2, 0):
+ self.assertIn('guest_nice', fields)
+ else:
+ self.assertNotIn('guest_nice', fields)
+
+ def test_net_if_addrs_ips(self):
+ for name, addrs in psutil.net_if_addrs().items():
+ for addr in addrs:
+ if addr.family == psutil.AF_LINK:
+ self.assertEqual(addr.address, get_mac_address(name))
+ elif addr.family == socket.AF_INET:
+ self.assertEqual(addr.address, get_ipv4_address(name))
+ # TODO: test for AF_INET6 family
+
+ @unittest.skipUnless(which('ip'), "'ip' utility not available")
+ @unittest.skipIf(TRAVIS, "skipped on Travis")
+ def test_net_if_names(self):
+ out = sh("ip addr").strip()
+ nics = psutil.net_if_addrs()
+ found = 0
+ for line in out.split('\n'):
+ line = line.strip()
+ if re.search("^\d+:", line):
+ found += 1
+ name = line.split(':')[1].strip()
+ self.assertIn(name, nics.keys())
+ self.assertEqual(len(nics), found, msg="%s\n---\n%s" % (
+ pprint.pformat(nics), out))
+
+ @unittest.skipUnless(which("nproc"), "nproc utility not available")
+ def test_cpu_count_logical_w_nproc(self):
+ num = int(sh("nproc --all"))
+ self.assertEqual(psutil.cpu_count(logical=True), num)
+
+ @unittest.skipUnless(which("lscpu"), "lscpu utility not available")
+ def test_cpu_count_logical_w_lscpu(self):
+ out = sh("lscpu -p")
+ num = len([x for x in out.split('\n') if not x.startswith('#')])
+ self.assertEqual(psutil.cpu_count(logical=True), num)
+
+ # --- mocked tests
+
+ def test_virtual_memory_mocked_warnings(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ with warnings.catch_warnings(record=True) as ws:
+ warnings.simplefilter("always")
+ ret = psutil._pslinux.virtual_memory()
+ assert m.called
+ self.assertEqual(len(ws), 1)
+ w = ws[0]
+ self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
+ self.assertIn(
+ "'cached', 'active' and 'inactive' memory stats couldn't "
+ "be determined", str(w.message))
+ self.assertEqual(ret.cached, 0)
+ self.assertEqual(ret.active, 0)
+ self.assertEqual(ret.inactive, 0)
+
+ def test_swap_memory_mocked_warnings(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ with warnings.catch_warnings(record=True) as ws:
+ warnings.simplefilter("always")
+ ret = psutil._pslinux.swap_memory()
+ assert m.called
+ self.assertEqual(len(ws), 1)
+ w = ws[0]
+ self.assertTrue(w.filename.endswith('psutil/_pslinux.py'))
+ self.assertIn(
+ "'sin' and 'sout' swap memory stats couldn't "
+ "be determined", str(w.message))
+ self.assertEqual(ret.sin, 0)
+ self.assertEqual(ret.sout, 0)
+
+ def test_cpu_count_logical_mocked(self):
+ import psutil._pslinux
+ original = psutil._pslinux.cpu_count_logical()
+ # Here we want to mock os.sysconf("SC_NPROCESSORS_ONLN") in
+ # order to cause the parsing of /proc/cpuinfo and /proc/stat.
+ with mock.patch(
+ 'psutil._pslinux.os.sysconf', side_effect=ValueError) as m:
+ self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
+ assert m.called
+
+ # Let's have open() return emtpy data and make sure None is
+ # returned ('cause we mimick os.cpu_count()).
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertIsNone(psutil._pslinux.cpu_count_logical())
+ self.assertEqual(m.call_count, 2)
+ # /proc/stat should be the last one
+ self.assertEqual(m.call_args[0][0], '/proc/stat')
+
+ # Let's push this a bit further and make sure /proc/cpuinfo
+ # parsing works as expected.
+ with open('/proc/cpuinfo', 'rb') as f:
+ cpuinfo_data = f.read()
+ fake_file = io.BytesIO(cpuinfo_data)
+ with mock.patch('psutil._pslinux.open',
+ return_value=fake_file, create=True) as m:
+ self.assertEqual(psutil._pslinux.cpu_count_logical(), original)
+
+ def test_cpu_count_physical_mocked(self):
+ # Have open() return emtpy data and make sure None is returned
+ # ('cause we want to mimick os.cpu_count())
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertIsNone(psutil._pslinux.cpu_count_physical())
+ assert m.called
+
+ def test_proc_open_files_file_gone(self):
+ # simulates a file which gets deleted during open_files()
+ # execution
+ p = psutil.Process()
+ files = p.open_files()
+ with tempfile.NamedTemporaryFile():
+ # give the kernel some time to see the new file
+ call_until(p.open_files, "len(ret) != %i" % len(files))
+ with mock.patch('psutil._pslinux.os.readlink',
+ side_effect=OSError(errno.ENOENT, "")) as m:
+ files = p.open_files()
+ assert not files
+ assert m.called
+ # also simulate the case where os.readlink() returns EINVAL
+ # in which case psutil is supposed to 'continue'
+ with mock.patch('psutil._pslinux.os.readlink',
+ side_effect=OSError(errno.EINVAL, "")) as m:
+ self.assertEqual(p.open_files(), [])
+ assert m.called
+
+ def test_proc_terminal_mocked(self):
+ with mock.patch('psutil._pslinux._psposix._get_terminal_map',
+ return_value={}) as m:
+ self.assertIsNone(psutil._pslinux.Process(os.getpid()).terminal())
+ assert m.called
+
+ def test_proc_num_ctx_switches_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).num_ctx_switches)
+ assert m.called
+
+ def test_proc_num_threads_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).num_threads)
+ assert m.called
+
+ def test_proc_ppid_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).ppid)
+ assert m.called
+
+ def test_proc_uids_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).uids)
+ assert m.called
+
+ def test_proc_gids_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).gids)
+ assert m.called
+
+ def test_proc_cmdline_mocked(self):
+ # see: https://github.com/giampaolo/psutil/issues/639
+ p = psutil.Process()
+ fake_file = io.StringIO(u('foo\x00bar\x00'))
+ with mock.patch('psutil._pslinux.open',
+ return_value=fake_file, create=True) as m:
+ p.cmdline() == ['foo', 'bar']
+ assert m.called
+ fake_file = io.StringIO(u('foo\x00bar\x00\x00'))
+ with mock.patch('psutil._pslinux.open',
+ return_value=fake_file, create=True) as m:
+ p.cmdline() == ['foo', 'bar', '']
+ assert m.called
+
+ def test_proc_io_counters_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ NotImplementedError,
+ psutil._pslinux.Process(os.getpid()).io_counters)
+ assert m.called
+
+ def test_boot_time_mocked(self):
+ with mock.patch('psutil._pslinux.open', create=True) as m:
+ self.assertRaises(
+ RuntimeError,
+ psutil._pslinux.boot_time)
+ assert m.called
+
+ def test_users_mocked(self):
+ # Make sure ':0' and ':0.0' (returned by C ext) are converted
+ # to 'localhost'.
+ with mock.patch('psutil._pslinux.cext.users',
+ return_value=[('giampaolo', 'pts/2', ':0',
+ 1436573184.0, True)]) as m:
+ self.assertEqual(psutil.users()[0].host, 'localhost')
+ assert m.called
+ with mock.patch('psutil._pslinux.cext.users',
+ return_value=[('giampaolo', 'pts/2', ':0.0',
+ 1436573184.0, True)]) as m:
+ self.assertEqual(psutil.users()[0].host, 'localhost')
+ assert m.called
+ # ...otherwise it should be returned as-is
+ with mock.patch('psutil._pslinux.cext.users',
+ return_value=[('giampaolo', 'pts/2', 'foo',
+ 1436573184.0, True)]) as m:
+ self.assertEqual(psutil.users()[0].host, 'foo')
+ assert m.called
+
+ def test_disk_partitions_mocked(self):
+ # Test that ZFS partitions are returned.
+ with open("/proc/filesystems", "r") as f:
+ data = f.read()
+ if 'zfs' in data:
+ for part in psutil.disk_partitions():
+ if part.fstype == 'zfs':
+ break
+ else:
+ self.fail("couldn't find any ZFS partition")
+ else:
+ # No ZFS partitions on this system. Let's fake one.
+ fake_file = io.StringIO(u("nodev\tzfs\n"))
+ with mock.patch('psutil._pslinux.open',
+ return_value=fake_file, create=True) as m1:
+ with mock.patch(
+ 'psutil._pslinux.cext.disk_partitions',
+ return_value=[('/dev/sdb3', '/', 'zfs', 'rw')]) as m2:
+ ret = psutil.disk_partitions()
+ assert m1.called
+ assert m2.called
+ assert ret
+ self.assertEqual(ret[0].fstype, 'zfs')
+
+ # --- tests for specific kernel versions
+
+ @unittest.skipUnless(
+ get_kernel_version() >= (2, 6, 36),
+ "prlimit() not available on this Linux kernel version")
+ def test_prlimit_availability(self):
+ # prlimit() should be available starting from kernel 2.6.36
+ p = psutil.Process(os.getpid())
+ p.rlimit(psutil.RLIMIT_NOFILE)
+ # if prlimit() is supported *at least* these constants should
+ # be available
+ self.assertTrue(hasattr(psutil, "RLIM_INFINITY"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_AS"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_CORE"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_CPU"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_DATA"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_FSIZE"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_LOCKS"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_MEMLOCK"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_NOFILE"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_NPROC"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_RSS"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_STACK"))
+
+ @unittest.skipUnless(
+ get_kernel_version() >= (3, 0),
+ "prlimit constants not available on this Linux kernel version")
+ def test_resource_consts_kernel_v(self):
+ # more recent constants
+ self.assertTrue(hasattr(psutil, "RLIMIT_MSGQUEUE"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_NICE"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_RTPRIO"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_RTTIME"))
+ self.assertTrue(hasattr(psutil, "RLIMIT_SIGPENDING"))
+
+
+def main():
+ test_suite = unittest.TestSuite()
+ test_suite.addTest(unittest.makeSuite(LinuxSpecificTestCase))
+ result = unittest.TextTestRunner(verbosity=2).run(test_suite)
+ return result.wasSuccessful()
+
+if __name__ == '__main__':
+ if not main():
+ sys.exit(1)