#!/usr/bin/env python # Copyright (c) 2009, Giampaolo Rodola'. All rights reserved. # Use of this source code is governed by a BSD-style license that can be # found in the LICENSE file. """POSIX specific tests. These are implicitly run by test_psutil.py.""" import datetime import os import subprocess import sys import time import psutil from psutil._compat import PY3, callable from test_psutil import LINUX, SUNOS, OSX, BSD, PYTHON, POSIX, TRAVIS from test_psutil import (get_test_subprocess, skip_on_access_denied, retry_before_failing, reap_children, sh, unittest, get_kernel_version, wait_for_pid) def ps(cmd): """Expects a ps command with a -o argument and parse the result returning only the value of interest. """ if not LINUX: cmd = cmd.replace(" --no-headers ", " ") if SUNOS: cmd = cmd.replace("-o command", "-o comm") cmd = cmd.replace("-o start", "-o stime") p = subprocess.Popen(cmd, shell=1, stdout=subprocess.PIPE) output = p.communicate()[0].strip() if PY3: output = str(output, sys.stdout.encoding) if not LINUX: output = output.split('\n')[1].strip() try: return int(output) except ValueError: return output @unittest.skipUnless(POSIX, "not a POSIX system") class PosixSpecificTestCase(unittest.TestCase): """Compare psutil results against 'ps' command line utility.""" @classmethod def setUpClass(cls): cls.pid = get_test_subprocess([PYTHON, "-E", "-O"], stdin=subprocess.PIPE).pid wait_for_pid(cls.pid) @classmethod def tearDownClass(cls): reap_children() # for ps -o arguments see: http://unixhelp.ed.ac.uk/CGI/man-cgi?ps def test_process_parent_pid(self): ppid_ps = ps("ps --no-headers -o ppid -p %s" % self.pid) ppid_psutil = psutil.Process(self.pid).ppid() self.assertEqual(ppid_ps, ppid_psutil) def test_process_uid(self): uid_ps = ps("ps --no-headers -o uid -p %s" % self.pid) uid_psutil = psutil.Process(self.pid).uids().real self.assertEqual(uid_ps, uid_psutil) def test_process_gid(self): gid_ps = ps("ps --no-headers -o rgid -p %s" % self.pid) gid_psutil = psutil.Process(self.pid).gids().real self.assertEqual(gid_ps, gid_psutil) def test_process_username(self): username_ps = ps("ps --no-headers -o user -p %s" % self.pid) username_psutil = psutil.Process(self.pid).username() self.assertEqual(username_ps, username_psutil) @skip_on_access_denied() @retry_before_failing() def test_process_rss_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) rss_ps = ps("ps --no-headers -o rss -p %s" % self.pid) rss_psutil = psutil.Process(self.pid).memory_info()[0] / 1024 self.assertEqual(rss_ps, rss_psutil) @skip_on_access_denied() @retry_before_failing() def test_process_vsz_memory(self): # give python interpreter some time to properly initialize # so that the results are the same time.sleep(0.1) vsz_ps = ps("ps --no-headers -o vsz -p %s" % self.pid) vsz_psutil = psutil.Process(self.pid).memory_info()[1] / 1024 self.assertEqual(vsz_ps, vsz_psutil) def test_process_name(self): # use command + arg since "comm" keyword not supported on all platforms name_ps = ps("ps --no-headers -o command -p %s" % ( self.pid)).split(' ')[0] # remove path if there is any, from the command name_ps = os.path.basename(name_ps).lower() name_psutil = psutil.Process(self.pid).name().lower() self.assertEqual(name_ps, name_psutil) @unittest.skipIf(OSX or BSD, 'ps -o start not available') def test_process_create_time(self): time_ps = ps("ps --no-headers -o start -p %s" % self.pid).split(' ')[0] time_psutil = psutil.Process(self.pid).create_time() time_psutil_tstamp = datetime.datetime.fromtimestamp( time_psutil).strftime("%H:%M:%S") # sometimes ps shows the time rounded up instead of down, so we check # for both possible values round_time_psutil = round(time_psutil) round_time_psutil_tstamp = datetime.datetime.fromtimestamp( round_time_psutil).strftime("%H:%M:%S") self.assertIn(time_ps, [time_psutil_tstamp, round_time_psutil_tstamp]) def test_process_exe(self): ps_pathname = ps("ps --no-headers -o command -p %s" % self.pid).split(' ')[0] psutil_pathname = psutil.Process(self.pid).exe() try: self.assertEqual(ps_pathname, psutil_pathname) except AssertionError: # certain platforms such as BSD are more accurate returning: # "/usr/local/bin/python2.7" # ...instead of: # "/usr/local/bin/python" # We do not want to consider this difference in accuracy # an error. adjusted_ps_pathname = ps_pathname[:len(ps_pathname)] self.assertEqual(ps_pathname, adjusted_ps_pathname) def test_process_cmdline(self): ps_cmdline = ps("ps --no-headers -o command -p %s" % self.pid) psutil_cmdline = " ".join(psutil.Process(self.pid).cmdline()) if SUNOS: # ps on Solaris only shows the first part of the cmdline psutil_cmdline = psutil_cmdline.split(" ")[0] self.assertEqual(ps_cmdline, psutil_cmdline) @retry_before_failing() def test_pids(self): # Note: this test might fail if the OS is starting/killing # other processes in the meantime if SUNOS: cmd = ["ps", "ax"] else: cmd = ["ps", "ax", "-o", "pid"] p = get_test_subprocess(cmd, stdout=subprocess.PIPE) output = p.communicate()[0].strip() if PY3: output = str(output, sys.stdout.encoding) pids_ps = [] for line in output.split('\n')[1:]: if line: pid = int(line.split()[0].strip()) pids_ps.append(pid) # remove ps subprocess pid which is supposed to be dead in meantime pids_ps.remove(p.pid) pids_psutil = psutil.pids() pids_ps.sort() pids_psutil.sort() # on OSX ps doesn't show pid 0 if OSX and 0 not in pids_ps: pids_ps.insert(0, 0) if pids_ps != pids_psutil: difference = [x for x in pids_psutil if x not in pids_ps] + \ [x for x in pids_ps if x not in pids_psutil] self.fail("difference: " + str(difference)) # for some reason ifconfig -a does not report all interfaces # returned by psutil @unittest.skipIf(SUNOS, "test not reliable on SUNOS") @unittest.skipIf(TRAVIS, "test not reliable on Travis") def test_nic_names(self): p = subprocess.Popen("ifconfig -a", shell=1, stdout=subprocess.PIPE) output = p.communicate()[0].strip() if PY3: output = str(output, sys.stdout.encoding) for nic in psutil.net_io_counters(pernic=True).keys(): for line in output.split(): if line.startswith(nic): break else: self.fail( "couldn't find %s nic in 'ifconfig -a' output\n%s" % ( nic, output)) @retry_before_failing() def test_users(self): out = sh("who") lines = out.split('\n') users = [x.split()[0] for x in lines] self.assertEqual(len(users), len(psutil.users())) terminals = [x.split()[1] for x in lines] for u in psutil.users(): self.assertTrue(u.name in users, u.name) self.assertTrue(u.terminal in terminals, u.terminal) def test_fds_open(self): # Note: this fails from time to time; I'm keen on thinking # it doesn't mean something is broken def call(p, attr): args = () attr = getattr(p, name, None) if attr is not None and callable(attr): if name == 'rlimit': args = (psutil.RLIMIT_NOFILE,) attr(*args) else: attr p = psutil.Process(os.getpid()) failures = [] ignored_names = ['terminate', 'kill', 'suspend', 'resume', 'nice', 'send_signal', 'wait', 'children', 'as_dict'] if LINUX and get_kernel_version() < (2, 6, 36): ignored_names.append('rlimit') if LINUX and get_kernel_version() < (2, 6, 23): ignored_names.append('num_ctx_switches') for name in dir(psutil.Process): if (name.startswith('_') or name in ignored_names): continue else: try: num1 = p.num_fds() for x in range(2): call(p, name) num2 = p.num_fds() except psutil.AccessDenied: pass else: if abs(num2 - num1) > 1: fail = "failure while processing Process.%s method " \ "(before=%s, after=%s)" % (name, num1, num2) failures.append(fail) if failures: self.fail('\n' + '\n'.join(failures)) def main(): test_suite = unittest.TestSuite() test_suite.addTest(unittest.makeSuite(PosixSpecificTestCase)) result = unittest.TextTestRunner(verbosity=2).run(test_suite) return result.wasSuccessful() if __name__ == '__main__': if not main(): sys.exit(1)